diff --git a/src/MigrateExecutable.php b/src/MigrateExecutable.php index 2930e42..9b3cb3d 100644 --- a/src/MigrateExecutable.php +++ b/src/MigrateExecutable.php @@ -94,6 +94,13 @@ class MigrateExecutable extends MigrateExecutableBase { protected $listeners = []; /** + * List of event listeners we have registered. + * + * @var \Drupal\migrate\Plugin\MigrateSourceInterface + */ + protected $source; + + /** * {@inheritdoc} */ public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) { @@ -321,6 +328,106 @@ class MigrateExecutable extends MigrateExecutableBase { } /** + * {@inheritdoc} + */ + public function rollbackMissingItems() { + // Only begin the rollback operation if the migration is currently idle. + if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { + $status = $this->migration->getStatusLabel; + $this->message->display($this->t('Migration @id is busy with another operation: @status', + [ + '@id' => $this->migration->id(), + '@status' => $status, + ]), + 'error'); + return MigrationInterface::RESULT_FAILED; + } + + if (!$this->migration->getSourcePlugin() instanceof SyncableSourceInterface) { + $this->message->display($this->t('Migration @id does not support rolling back items missing from the source', ['@id' => $this->migration->id()]), 'error'); + return MigrationInterface::RESULT_FAILED; + } + + // Announce that rollback is about to happen. + $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration)); + + // Optimistically assume things are going to work out; if not, $return will + // be updated to some other status. + $return = MigrationInterface::RESULT_COMPLETED; + + $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); + $id_map = $this->migration->getIdMap(); + + // We can't use the source plugin as-is because we need ALL potential rows + // and certain plugin configurations will only return a subset. + $source_config = $this->migration->getSourceConfiguration(); + $source_config['all_rows'] = TRUE; + + /** @var \Drupal\migrate\Plugin\MigrateSourceInterface $source */ + $this->source = \Drupal::service('plugin.manager.migrate.source')->createInstance($source_config['plugin'], $source_config, $this->migration); + $source_ids = $this->source->sourceIds(); + // Rollback any rows that are not returned from the source plugin. + foreach ($id_map as $map_row) { + $source_key = $id_map->currentSource(); + $destination_key = $id_map->currentDestination(); + + // If this one wasn't imported, or if we're still receiving it from the + // source plugin, then we don't need to do anything. + if (!$destination_key || !$source_key || in_array($source_key, $source_ids)) { + continue; + } + + $event = $this->getEventDispatcher() + ->dispatch(MigratePlusEvents::MISSING_SOURCE_ITEM, new MigrateRowDeleteEvent($this->migration, $destination_key)); + if (!$event->isPropagationStopped()) { + $this->rollbackCurrentRow(); + } + + // Check for memory exhaustion. + if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { + break; + } + + // If anyone has requested we stop, return the requested result. + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + $return = $this->migration->getInterruptionResult(); + $this->migration->clearInterruptionResult(); + break; + } + } + + // Notify modules that rollback attempt was complete. + $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration)); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + + return $return; + } + + /** + * Roll back the current row. + */ + protected function rollbackCurrentRow() { + $id_map = $this->migration->getIdMap(); + $destination_key = $id_map->currentDestination(); + $destination = $this->migration->getDestinationPlugin(); + + if ($destination_key) { + $map_row = $id_map->getRowByDestination($destination_key); + + if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { + $this->getEventDispatcher() + ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); + $destination->rollback($destination_key); + $this->getEventDispatcher() + ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); + } + + // We're now done with this row, so remove it from the map. + $id_map->deleteDestination($destination_key); + } + } + + /** * React to an item about to be imported. * * @param \Drupal\migrate\Event\MigratePreRowSaveEvent $event diff --git a/src/Plugin/migrate/source/EmbedDataSyncableSource.php b/src/Plugin/migrate/source/EmbedDataSyncableSource.php new file mode 100644 index 0000000..101c3ca --- /dev/null +++ b/src/Plugin/migrate/source/EmbedDataSyncableSource.php @@ -0,0 +1,31 @@ +readAllRowsConfiguration(); + } + +} diff --git a/src/Plugin/migrate/source/SyncableSourceTrait.php b/src/Plugin/migrate/source/SyncableSourceTrait.php new file mode 100644 index 0000000..22dc1c0 --- /dev/null +++ b/src/Plugin/migrate/source/SyncableSourceTrait.php @@ -0,0 +1,111 @@ +configuration['all_rows'])) { + $this->allRows = (bool) $this->configuration['all_rows']; + } + } + + /** + * Handles all_rows configuration during \Iterator::next(). + * + * Based on Drupal\migrate\Plugin\migrate\source\SourcePluginBase::next(). + * + * @todo check used methods, maybe there is an implicit assumption that this + * is a SourcePluginBase child. + */ + public function next() { + $this->currentSourceIds = NULL; + $this->currentRow = NULL; + + // In order to find the next row we want to process, we ask the source + // plugin for the next possible row. + while (!isset($this->currentRow) && $this->getIterator()->valid()) { + + $row_data = $this->getIterator()->current() + $this->configuration; + $this->fetchNextRow(); + $row = new Row($row_data, $this->getIds()); + + // Populate the source key for this row. + $this->currentSourceIds = $row->getSourceIdValues(); + + // Pick up the existing map row, if any, unless fetchNextRow() did it. + if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) { + $row->setIdMap($id_map); + } + + // Clear any previous messages for this row before potentially adding + // new ones. + if (!empty($this->currentSourceIds)) { + $this->idMap->delete($this->currentSourceIds, TRUE); + } + + // Preparing the row gives source plugins the chance to skip. + if ($this->prepareRow($row) === FALSE) { + continue; + } + + // Check whether the row needs processing. + // 1. We're supposed to return all rows. + // 2. This row has not been imported yet. + // 3. Explicitly set to update. + // 4. The row is newer than the current highwater mark. + // 5. If no such property exists then try by checking the hash of the row. + if ($this->allRows || !$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) { + $this->currentRow = $row->freezeSource(); + } + + if ($this->getHighWaterProperty()) { + $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name'])); + } + } + } + + /** + * {@inheritdoc} + */ + public function sourceIds() { + $ids = []; + foreach ($this->idList() as $source_ids) { + $ids[] = $source_ids; + } + return $ids; + } + + /** + * Generator for source ids. + */ + protected function idList() { + $ids = $this->getIds(); + $iterator = $this->getIterator(); + $iterator->rewind(); + while ($iterator->valid()) { + yield array_intersect_key($iterator->current(), $ids); + $iterator->next(); + } + } + +} diff --git a/src/SyncableSourceInterface.php b/src/SyncableSourceInterface.php new file mode 100644 index 0000000..611c6df --- /dev/null +++ b/src/SyncableSourceInterface.php @@ -0,0 +1,20 @@ +installEntitySchema('user'); + $this->installEntitySchema('taxonomy_vocabulary'); + $this->installEntitySchema('taxonomy_term'); + $this->installConfig(['taxonomy']); + } + + /** + * Tests rolling back configuration and content entities. + */ + public function testRollbackMissingData() { + // We use vocabularies to demonstrate importing and rolling back + // configuration entities. + $vocabulary_data_rows = [ + 1 => ['id' => '1', 'name' => 'categories', 'weight' => '2'], + 2 => ['id' => '2', 'name' => 'tags', 'weight' => '1'], + 3 => ['id' => '3', 'name' => 'trees', 'weight' => '-1'], + ]; + + /** @var \Drupal\migrate\Plugin\MigrationInterface $vocabulary_migration */ + $vocabulary_migration = $this->createMigration('import', $vocabulary_data_rows); + $executable = new MigrateExecutable($vocabulary_migration, $this); + $executable->import(); + + foreach ([1, 2, 3] as $vid) { + /** @var \Drupal\taxonomy\Entity\Vocabulary $vocabulary */ + $vocabulary = Vocabulary::load($vid); + $this->assertTrue($vocabulary); + } + + // Remove vocabulary 2 from the data source, and update the migration. + unset($vocabulary_data_rows[2]); + $vocabulary_migration = $this->createMigration('import', $vocabulary_data_rows); + $vocabulary_id_map = $vocabulary_migration->getIdMap(); + + // Rollback. + $rollback_executable = new MigrateExecutable($vocabulary_migration, $this); + $rollback_executable->rollbackMissingItems(); + + // Test that vocabulary 2 has been rolled back. + $vocabulary = Vocabulary::load(2); + $this->assertFalse($vocabulary); + $map_row = $vocabulary_id_map->getRowBySource(['id' => 2]); + $this->assertNull($map_row['destid1']); + + // Test that vocabulary 1 and 3 have not been rolled back. + foreach ([1, 3] as $vid) { + $vocabulary = Vocabulary::load($vid); + $this->assertTrue($vocabulary); + $map_row = $vocabulary_id_map->getRowBySource(['id' => $vid]); + $this->assertNotNull($map_row['destid1']); + } + } + + /** + * Helper to create a vocabulary migration with given data. + * + * @param string $id + * An migration id. + * @param array $data + * Data for the embedded data source plugin for a vocabulary migration. + * + * @return \Drupal\migrate\Plugin\MigrationInterface + * A vocabulary migration stub. + */ + protected function createMigration($id, array $data) { + $ids = ['id' => ['type' => 'integer']]; + $definition = [ + 'id' => $id, + 'migration_tags' => ['Import and rollback test'], + 'source' => [ + 'plugin' => 'syncable_embedded_data', + 'data_rows' => $data, + 'ids' => $ids, + ], + 'process' => [ + 'vid' => 'id', + 'name' => 'name', + 'weight' => 'weight', + ], + 'destination' => ['plugin' => 'entity:taxonomy_vocabulary'], + ]; + + /** @var \Drupal\migrate\Plugin\MigrationInterface $vocabulary_migration */ + $vocabulary_migration = \Drupal::service('plugin.manager.migration') + ->createStubMigration($definition); + return $vocabulary_migration; + } + +}