diff --git a/core/modules/migrate/src/MigrateExecutable.php b/core/modules/migrate/src/MigrateExecutable.php index e8316e4..835ca7e 100644 --- a/core/modules/migrate/src/MigrateExecutable.php +++ b/core/modules/migrate/src/MigrateExecutable.php @@ -298,22 +298,12 @@ public function rollback() { $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); $id_map = $this->migration->getIdMap(); - $destination = $this->migration->getDestinationPlugin(); // Loop through each row in the map, and try to roll it back. foreach ($id_map as $map_row) { $destination_key = $id_map->currentDestination(); if ($destination_key) { - $map_row = $id_map->getRowByDestination($destination_key); - if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { - $this->getEventDispatcher() - ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); - $destination->rollback($destination_key); - $this->getEventDispatcher() - ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); - } - // We're now done with this row, so remove it from the map. - $id_map->deleteDestination($destination_key); + $this->rollbackCurrentRow(); } else { // If there is no destination key the import probably failed and we can @@ -345,6 +335,118 @@ public function rollback() { /** * {@inheritdoc} */ + public function rollbackMissingItems() { + // Only begin the rollback operation if the migration is currently idle. + if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) { + $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error'); + return MigrationInterface::RESULT_FAILED; + } + + // Announce that rollback is about to happen. + $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration)); + + // Optimistically assume things are going to work out; if not, $return will + // be updated to some other status. + $return = MigrationInterface::RESULT_COMPLETED; + + $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK); + $id_map = $this->migration->getIdMap(); + + // We can't use the source plugin as-is because we need ALL potential rows + // and certain plugin configurations will only return a subset. + $source_config = $this->migration->getSourceConfiguration(); + $source_config['all_rows'] = TRUE; + $source = \Drupal::service('plugin.manager.migrate.source')->createInstance($source_config['plugin'], $source_config, $this->migration); + + // Accumulate the source IDs, for later comparison. + try { + $source->rewind(); + } + catch (\Exception $e) { + $this->message->display( + $this->t('Migration failed with source plugin exception: @e', array('@e' => $e->getMessage())), 'error'); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + return MigrationInterface::RESULT_FAILED; + } + + $source_items = []; + while ($source->valid()) { + $row = $source->current(); + $source_items[] = $row->getSourceIdValues(); + + try { + $source->next(); + } + catch (\Exception $e) { + $this->message->display( + $this->t('Migration failed with source plugin exception: @e', + array('@e' => $e->getMessage())), 'error'); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + return MigrationInterface::RESULT_FAILED; + } + } + + // Rollback any rows that are not part of what we get from the source + // plugin. + foreach ($id_map as $map_row) { + $source_key = $id_map->currentSource(); + $destination_key = $id_map->currentDestination(); + + // If this one wasn't imported, or if we're still receiving it from the + // source plugin, then we don't need to do anything. + if (!$destination_key || !$source_key || in_array($source_key, $source_items)) { + continue; + } + + $this->rollbackCurrentRow(); + + // Check for memory exhaustion. + if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) { + break; + } + + // If anyone has requested we stop, return the requested result. + if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) { + $return = $this->migration->getInterruptionResult(); + $this->migration->clearInterruptionResult(); + break; + } + } + + // Notify modules that rollback attempt was complete. + $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration)); + $this->migration->setStatus(MigrationInterface::STATUS_IDLE); + + return $return; + } + + /** + * Roll back the current row. + */ + protected function rollbackCurrentRow() { + $id_map = $this->migration->getIdMap(); + $destination_key = $id_map->currentDestination(); + $destination = $this->migration->getDestinationPlugin(); + + if ($destination_key) { + $map_row = $id_map->getRowByDestination($destination_key); + + if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) { + $this->getEventDispatcher() + ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); + $destination->rollback($destination_key); + $this->getEventDispatcher() + ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key)); + } + + // We're now done with this row, so remove it from the map. + $id_map->deleteDestination($destination_key); + } + } + + /** + * {@inheritdoc} + */ public function processRow(Row $row, array $process = NULL, $value = NULL) { foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) { $multiple = FALSE; diff --git a/core/modules/migrate/src/MigrateExecutableInterface.php b/core/modules/migrate/src/MigrateExecutableInterface.php index 33d5914..e924904 100644 --- a/core/modules/migrate/src/MigrateExecutableInterface.php +++ b/core/modules/migrate/src/MigrateExecutableInterface.php @@ -17,6 +17,11 @@ public function import(); public function rollback(); /** + * Performs a rollback only of items that are no longer part of the source. + */ + public function rollbackMissingItems(); + + /** * Processes a row. * * @param \Drupal\migrate\Row $row diff --git a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php index e355850..2492207 100644 --- a/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php +++ b/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php @@ -111,6 +111,13 @@ protected $trackChanges = FALSE; /** + * Flags whether to return all available source rows. + * + * @var bool + */ + protected $allRows = FALSE; + + /** * Flags whether source plugin will read the map row and add to data row. * * By default, next() will directly read the map row and add it to the data @@ -151,7 +158,7 @@ public function __construct(array $configuration, $plugin_id, $plugin_definition $this->migration = $migration; // Set up some defaults based on the source configuration. - foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) { + foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes', 'allRows' => 'all_rows'] as $property => $config_key) { if (isset($configuration[$config_key])) { $this->$property = (bool) $configuration[$config_key]; } @@ -333,11 +340,12 @@ public function next() { } // Check whether the row needs processing. - // 1. This row has not been imported yet. - // 2. Explicitly set to update. - // 3. The row is newer than the current highwater mark. - // 4. If no such property exists then try by checking the hash of the row. - if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) { + // 1. We're supposed to return all rows. + // 2. This row has not been imported yet. + // 3. Explicitly set to update. + // 4. The row is newer than the current highwater mark. + // 5. If no such property exists then try by checking the hash of the row. + if ($this->allRows || !$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) { $this->currentRow = $row->freezeSource(); } diff --git a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php index 3c6d4c4..780bfd1 100644 --- a/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php +++ b/core/modules/migrate/src/Plugin/migrate/source/SqlBase.php @@ -219,77 +219,79 @@ protected function initializeIterator() { if ($this->batch == 0) { $this->prepareQuery(); - // Get the key values, for potential use in joining to the map table. - $keys = []; - - // The rules for determining what conditions to add to the query are as - // follows (applying first applicable rule): - // 1. If the map is joinable, join it. We will want to accept all rows - // which are either not in the map, or marked in the map as NEEDS_UPDATE. - // Note that if high water fields are in play, we want to accept all rows - // above the high water mark in addition to those selected by the map - // conditions, so we need to OR them together (but AND with any existing - // conditions in the query). So, ultimately the SQL condition will look - // like (original conditions) AND (map IS NULL OR map needs update - // OR above high water). - $conditions = $this->query->orConditionGroup(); - $condition_added = FALSE; - $added_fields = []; - if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) { - // Build the join to the map table. Because the source key could have - // multiple fields, we need to build things up. - $count = 1; - $map_join = ''; - $delimiter = ''; - foreach ($this->getIds() as $field_name => $field_schema) { - if (isset($field_schema['alias'])) { - $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name); + if(!$this->allRows) { + // Get the key values, for potential use in joining to the map table. + $keys = []; + + // The rules for determining what conditions to add to the query are as + // follows (applying first applicable rule): + // 1. If the map is joinable, join it. We will want to accept all rows + // which are either not in the map, or marked in the map as NEEDS_UPDATE. + // Note that if high water fields are in play, we want to accept all rows + // above the high water mark in addition to those selected by the map + // conditions, so we need to OR them together (but AND with any existing + // conditions in the query). So, ultimately the SQL condition will look + // like (original conditions) AND (map IS NULL OR map needs update + // OR above high water). + $conditions = $this->query->orConditionGroup(); + $condition_added = FALSE; + $added_fields = []; + if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) { + // Build the join to the map table. Because the source key could have + // multiple fields, we need to build things up. + $count = 1; + $map_join = ''; + $delimiter = ''; + foreach ($this->getIds() as $field_name => $field_schema) { + if (isset($field_schema['alias'])) { + $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name); + } + $map_join .= "$delimiter$field_name = map.sourceid" . $count++; + $delimiter = ' AND '; } - $map_join .= "$delimiter$field_name = map.sourceid" . $count++; - $delimiter = ' AND '; - } - $alias = $this->query->leftJoin($this->migration->getIdMap() - ->getQualifiedMapTableName(), 'map', $map_join); - $conditions->isNull($alias . '.sourceid1'); - $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE); - $condition_added = TRUE; - - // And as long as we have the map table, add its data to the row. - $n = count($this->getIds()); - for ($count = 1; $count <= $n; $count++) { - $map_key = 'sourceid' . $count; - $this->query->addField($alias, $map_key, "migrate_map_$map_key"); - $added_fields[] = "$alias.$map_key"; - } - if ($n = count($this->migration->getDestinationIds())) { + $alias = $this->query->leftJoin($this->migration->getIdMap() + ->getQualifiedMapTableName(), 'map', $map_join); + $conditions->isNull($alias . '.sourceid1'); + $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE); + $condition_added = TRUE; + + // And as long as we have the map table, add its data to the row. + $n = count($this->getIds()); for ($count = 1; $count <= $n; $count++) { - $map_key = 'destid' . $count++; + $map_key = 'sourceid' . $count; $this->query->addField($alias, $map_key, "migrate_map_$map_key"); $added_fields[] = "$alias.$map_key"; } + if ($n = count($this->migration->getDestinationIds())) { + for ($count = 1; $count <= $n; $count++) { + $map_key = 'destid' . $count++; + $this->query->addField($alias, $map_key, "migrate_map_$map_key"); + $added_fields[] = "$alias.$map_key"; + } + } + $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status'); + $added_fields[] = "$alias.source_row_status"; } - $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status'); - $added_fields[] = "$alias.source_row_status"; - } - // 2. If we are using high water marks, also include rows above the mark. - // But, include all rows if the high water mark is not set. - if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater())) { - $high_water_field = $this->getHighWaterField(); - $conditions->condition($high_water_field, $high_water, '>'); - $this->query->orderBy($high_water_field); - $condition_added = TRUE; - } - if ($condition_added) { - $this->query->condition($conditions); - } - // If the query has a group by, our added fields need it too, to keep the - // query valid. - // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html - $group_by = $this->query->getGroupBy(); - if ($group_by && $added_fields) { - foreach ($added_fields as $added_field) { - $this->query->groupBy($added_field); + // 2. If we are using high water marks, also include rows above the mark. + // But, include all rows if the high water mark is not set. + if ($this->getHighWaterProperty() && ($high_water = $this->getHighWater())) { + $high_water_field = $this->getHighWaterField(); + $conditions->condition($high_water_field, $high_water, '>'); + $this->query->orderBy($high_water_field); + $condition_added = TRUE; + } + if ($condition_added) { + $this->query->condition($conditions); + } + // If the query has a group by, our added fields need it too, to keep the + // query valid. + // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html + $group_by = $this->query->getGroupBy(); + if ($group_by && $added_fields) { + foreach ($added_fields as $added_field) { + $this->query->groupBy($added_field); + } } } } @@ -374,7 +376,7 @@ protected function mapJoinable() { if ($id_map_database_options['driver'] === 'pgsql' && $source_database_options['driver'] === 'pgsql' && $id_map_database_options['database'] != $source_database_options['database'] - ) { + ) { return FALSE; }