diff --git a/migrate_tools.routing.yml b/migrate_tools.routing.yml index b12735a..cecd682 100644 --- a/migrate_tools.routing.yml +++ b/migrate_tools.routing.yml @@ -74,6 +74,13 @@ entity.migration.process: _title: 'Process' requirements: _permission: 'administer migrations' +entity.migration.process.run: + path: '/admin/structure/migrate/manage/{migration_group}/migrations/{migration}/process/run' + defaults: + _controller: '\Drupal\migrate_tools\Controller\MigrationController::run' + _title: 'Run' + requirements: + _permission: 'administer migrations' entity.migration.destination: path: '/admin/structure/migrate/manage/{migration_group}/migrations/{migration}/destination' defaults: @@ -107,3 +114,11 @@ migrate_tools.messages: _title: 'Messages' requirements: _permission: 'administer migrations' + +migrate_tools.launch: + path: '/admin/structure/migrate/manage/{migration_group}/migrations/{migration}/launch' + defaults: + _form: '\Drupal\migrate_tools\Form\MigrationLaunchForm' + _title: 'Launch migration' + requirements: + _permission: 'administer migrations' diff --git a/src/BatchLogMigrateMessage.php b/src/BatchLogMigrateMessage.php new file mode 100644 index 0000000..458035b --- /dev/null +++ b/src/BatchLogMigrateMessage.php @@ -0,0 +1,21 @@ +migrationPluginManager->createInstance($migration); + + $migrateMessage = new MigrateMessage(); + $options = []; + + $executable = new MigrateBatchExecutable($migration, $migrateMessage, $options); + $executable->batchImport(); + + return batch_process(); + } + + /** * Display process information of a migration entity. * * @param string $migration_group @@ -205,6 +233,12 @@ class MigrationController extends ControllerBase implements ContainerInjectionIn '#empty' => $this->t('No process defined.'), ]; + $build['process']['run'] = [ + '#type' => 'link', + '#title' => $this->t('Run'), + '#url' => Url::fromRoute('entity.migration.process.run', ['migration_group' => $migration_group, 'migration' => $migration->id()]), + ]; + return $build; } diff --git a/src/Controller/MigrationListBuilder.php b/src/Controller/MigrationListBuilder.php index 035195c..ed27f3b 100644 --- a/src/Controller/MigrationListBuilder.php +++ b/src/Controller/MigrationListBuilder.php @@ -99,7 +99,7 @@ class MigrationListBuilder extends ConfigEntityListBuilder implements EntityHand * @return array * A render array structure of header strings. * - * @see Drupal\Core\Entity\EntityListController::render() + * @see \Drupal\Core\Entity\EntityListController::render() */ public function buildHeader() { $header['label'] = $this->t('Migration'); @@ -110,6 +110,7 @@ class MigrationListBuilder extends ConfigEntityListBuilder implements EntityHand $header['unprocessed'] = $this->t('Unprocessed'); $header['messages'] = $this->t('Messages'); $header['last_imported'] = $this->t('Last Imported'); + $header['operations'] = $this->t('Operations'); return $header; // + parent::buildHeader(); } @@ -125,56 +126,88 @@ class MigrationListBuilder extends ConfigEntityListBuilder implements EntityHand * @see \Drupal\Core\Entity\EntityListController::render() */ public function buildRow(EntityInterface $migration_entity) { - $migration = $this->migrationPluginManager->createInstance($migration_entity->id()); - $migration_group = $migration->get('migration_group'); - if (!$migration_group) { - $migration_group = 'default'; - } - $route_parameters = array( - 'migration_group' => $migration_group, - 'migration' => $migration->id(), - ); - $row['label'] = array( - 'data' => array( - '#type' => 'link', - '#title' => $migration->label(), - '#url' => Url::fromRoute("entity.migration.overview", $route_parameters), - ), - ); - $row['machine_name'] = $migration->id(); - $row['status'] = $migration->getStatusLabel(); - - // Derive the stats. - $source_plugin = $migration->getSourcePlugin(); - $row['total'] = $source_plugin->count(); - $map = $migration->getIdMap(); - $row['imported'] = $map->importedCount(); - // -1 indicates uncountable sources. - if ($row['total'] == -1) { - $row['total'] = $this->t('N/A'); - $row['unprocessed'] = $this->t('N/A'); + + try { + $migration = $this->migrationPluginManager->createInstance($migration_entity->id()); + $migration_group = $migration->get('migration_group'); + if (!$migration_group) { + $migration_group = 'default'; + } + $route_parameters = array( + 'migration_group' => $migration_group, + 'migration' => $migration->id(), + ); + $row['label'] = array( + 'data' => array( + '#type' => 'link', + '#title' => $migration->label(), + '#url' => Url::fromRoute("entity.migration.overview", $route_parameters), + ), + ); + $row['machine_name'] = $migration->id(); + $row['status'] = $migration->getStatusLabel(); } - else { - $row['unprocessed'] = $row['total'] - $map->processedCount(); + catch(\Exception $e) { + return NULL; } - $row['messages'] = array( - 'data' => array( - '#type' => 'link', - '#title' => $map->messageCount(), - '#url' => Url::fromRoute("migrate_tools.messages", $route_parameters), - ), - ); - $migrate_last_imported_store = \Drupal::keyValue('migrate_last_imported'); - $last_imported = $migrate_last_imported_store->get($migration->id(), FALSE); - if ($last_imported) { - /** @var DateFormatter $date_formatter */ - $date_formatter = \Drupal::service('date.formatter'); - $row['last_imported'] = $date_formatter->format($last_imported / 1000, - 'custom', 'Y-m-d H:i:s'); + + try { + // Derive the stats. + $source_plugin = $migration->getSourcePlugin(); + $row['total'] = $source_plugin->count(); + $map = $migration->getIdMap(); + $row['imported'] = $map->importedCount(); + // -1 indicates uncountable sources. + if ($row['total'] == -1) { + $row['total'] = $this->t('N/A'); + $row['unprocessed'] = $this->t('N/A'); + } + else { + $row['unprocessed'] = $row['total'] - $map->processedCount(); + } + $row['messages'] = array( + 'data' => array( + '#type' => 'link', + '#title' => $map->messageCount(), + '#url' => Url::fromRoute("migrate_tools.messages", $route_parameters), + ), + ); + $migrate_last_imported_store = \Drupal::keyValue('migrate_last_imported'); + $last_imported = $migrate_last_imported_store->get($migration->id(), FALSE); + if ($last_imported) { + /** @var DateFormatter $date_formatter */ + $date_formatter = \Drupal::service('date.formatter'); + $row['last_imported'] = $date_formatter->format($last_imported / 1000, + 'custom', 'Y-m-d H:i:s'); + } + else { + $row['last_imported'] = ''; + } + + $row['operations']['data'] = array( + '#type' => 'dropbutton', + '#links' => array( + 'simple_form' => array( + 'title' => $this->t('Launch'), + 'url' => Url::fromRoute('migrate_tools.launch', array( + 'migration_group' => $migration_group, + 'migration' => $migration->id() + )), + ), + ), + ); } - else { - $row['last_imported'] = ''; + catch (\Exception $e) { + // Derive the stats. + $row['status'] = $this->t('No data found'); + $row['total'] = $this->t('N/A'); + $row['imported'] = $this->t('N/A'); + $row['unprocessed'] = $this->t('N/A'); + $row['messages'] = $this->t('N/A'); + $row['last_imported'] = $this->t('N/A'); + $row['operations'] = $this->t('N/A'); } + return $row; // + parent::buildRow($migration_entity); } diff --git a/src/Form/MigrationLaunchForm.php b/src/Form/MigrationLaunchForm.php new file mode 100644 index 0000000..57972cf --- /dev/null +++ b/src/Form/MigrationLaunchForm.php @@ -0,0 +1,239 @@ +migrationPluginManager = $migration_plugin_manager; + } + + /** + * {@inheritdoc} + */ + public static function create(ContainerInterface $container) { + return new static( + $container->get('plugin.manager.migration') + ); + } + + /** + * {@inheritdoc} + */ + public function getFormId() { + return 'migration_launch_form'; + } + + /** + * {@inheritdoc} + */ + public function buildForm(array $form, FormStateInterface $form_state) { + + $form = []; + + $form['operations'] = $this->migrateMigrateOperations(); + + return $form; + } + + + /** + * Get Operations. + */ + private function migrateMigrateOperations() { + // Build the 'Update options' form. + $operations = [ + '#type' => 'fieldset', + '#title' => t('Operations'), + ]; + + $options = [ + '' => t('Please select'), + 'import_immediate' => t('Import immediately'), + 'rollback_immediate' => t('Rollback immediately'), + 'stop' => t('Stop'), + 'reset' => t('Reset'), + ]; + $operations['operation'] = [ + '#type' => 'select', + '#title' => t('Operation'), + '#title_display' => 'invisible', + '#options' => $options, + ]; + $operations['submit'] = [ + '#type' => 'submit', + '#value' => t('Execute'), + ]; + $operations['description'] = [ + '#prefix' => '

', + '#markup' => t( + 'Choose an operation to run on all selections above: +

' + ), + '#postfix' => '

', + ]; + + $operations['options'] = [ + '#type' => 'fieldset', + '#title' => t('Options'), + '#collapsible' => TRUE, + '#collapsed' => TRUE, + ]; + $operations['options']['update'] = [ + '#type' => 'checkbox', + '#title' => t('Update'), + '#description' => t('Check this box to update all previously-imported content + in addition to importing new content. Leave unchecked to only import + new content'), + ]; + $operations['options']['force'] = [ + '#type' => 'checkbox', + '#title' => t('Ignore dependencies'), + '#description' => t('Check this box to ignore dependencies when running imports + - all tasks will run whether or not their dependent tasks have + completed.'), + ]; + $operations['options']['limit'] = [ + '#type' => 'textfield', + '#title' => t('Limit to:'), + '#size' => 10, + '#description' => t('Set a limit of how many items to process for each migration task.'), + ]; + + return $operations; + } + + + /** + * {@inheritdoc} + */ + public function validateForm(array &$form, FormStateInterface $form_state) { + if (empty($form_state->getValue('operation'))) { + $form_state->setErrorByName('operation', $this->t('Please select an operation.')); + return; + } + } + + /** + * {@inheritdoc} + */ + public function submitForm(array &$form, FormStateInterface $form_state) { + + $operation = $form_state->getValue('operation'); + + if ($form_state->getValue('limit')) { + $limit = $form_state->getValue('limit'); + } + else { + $limit = 0; + } + + if ($form_state->getValue('update')) { + $update = $form_state->getValue('update'); + } + else { + $update = 0; + } + if ($form_state->getValue('force')) { + $force = $form_state->getValue('force'); + } + else { + $force = 0; + } + + $migration_name = \Drupal::routeMatch()->getParameter('migration'); + + if ($migration_name) { + + /** @var MigrationInterface $migration */ + $migration = $this->migrationPluginManager->createInstance($migration_name); + $migrateMessage = new MigrateMessage(); + + switch ($operation) { + case 'import_immediate': + + $options = [ + 'limit' => $limit, + 'update' => $update, + 'force' => $force, + ]; + + $executable = new MigrateBatchExecutable($migration, $migrateMessage, $options); + $executable->batchImport(); + + break; + + case 'rollback_immediate': + + $options = [ + 'limit' => $limit, + 'update' => $update, + 'force' => $force + ]; + + $executable = new MigrateBatchExecutable($migration, $migrateMessage, $options); + $executable->rollback(); + + break; + + case 'stop': + + $migration->interruptMigration(MigrationInterface::RESULT_STOPPED); + + break; + + case 'reset': + + $migration->setStatus(MigrationInterface::STATUS_IDLE); + + break; + + } + } + } + +} diff --git a/src/MigrateBatchExecutable.php b/src/MigrateBatchExecutable.php new file mode 100644 index 0000000..96ec47d --- /dev/null +++ b/src/MigrateBatchExecutable.php @@ -0,0 +1,289 @@ +updateExistingRows = $options['update']; + } + + if (isset($options['force'])) { + $this->checkDependencies = $options['force']; + } + + parent::__construct($migration, $message, $options); + $this->migrationPluginManager = \Drupal::getContainer()->get('plugin.manager.migration'); + } + + /** + * Sets the current batch content so listeners can update the messages. + * + * @param array $context + */ + public function setBatchContext(&$context) { + $this->batchContext = &$context; + } + + /** + * Gets a reference to the current batch context. + * + * @return array + */ + public function &getBatchContext() { + return $this->batchContext; + } + + /** + * Setup batch operations for running the migration. + */ + public function batchImport() { + // Create the batch operations for each migration that needs to be executed. + // This includes the migration for this executable, but also the dependent + // migrations. + $operations = $this->batchOperations([$this->migration], 'import', [ + 'limit' => $this->itemLimit, + 'update' => $this->updateExistingRows, + 'force' => $this->checkDependencies + ]); + + if (count($operations) > 0) { + $batch = [ + 'operations' => $operations, + 'title' => t('Migrating %migrate', ['%migrate' => $this->migration->label()]), + 'init_message' => t('Start migrating %migrate', ['%migrate' => $this->migration->label()]), + 'progress_message' => t('Migrating %migrate', ['%migrate' => $this->migration->label()]), + 'error_message' => t('An error occurred while migrating %migrate.', ['%migrate' => $this->migration->label()]), + 'finished' => '\Drupal\migrate_tools\MigrateBatchExecutable::batchFinishedImport', + ]; + + batch_set($batch); + } + } + + /** + * Helper to generate the batch operations for importing migrations. + * + * @param array $migrations + * @param array $operation + * @param array $options + * + * @return array + */ + protected function batchOperations($migrations, $operation, $options = []) { + + $operations = []; + + /** + * @var string $id + * @var Migration $migration + */ + foreach ($migrations as $id => $migration) { + + if (!empty($options['update'])) { + $migration->getIdMap()->prepareUpdate(); + } + + if (!empty($options['force'])) { + $migration->set('requirements', []); + } + else { + $dependencies = $migration->getMigrationDependencies(); + if (!empty($dependencies['required'])) { + $required_migrations = $this->migrationPluginManager->createInstances($dependencies['required']); + // For dependent migrations will need to be migrate all items. + $dependent_options = $options; + $dependent_options['limit'] = 0; + $operations += $this->batchOperations($required_migrations, $operation, [ + 'limit' => 0, + 'update' => $options['update'], + 'force' => $options['force'] + ]); + } + } + + $operations[] = [ + '\Drupal\migrate_tools\MigrateBatchExecutable::batchProcessImport', + [$migration->id(), $options] + ]; + } + + return $operations; + } + + /** + * Batch 'operation' callback + * + * @param string $migration_id + * @param array $options + * @param array $context + * + */ + static public function batchProcessImport($migration_id, $options, &$context) { + + if (empty($context['sandbox'])) { + $context['finished'] = 0; + $context['sandbox'] = []; + $context['sandbox']['total'] = 0; + $context['sandbox']['counter'] = 0; + $context['sandbox']['batch_limit'] = 0; + $context['sandbox']['operation'] = MigrateBatchExecutable::BATCH_IMPORT; + } + + // Prepare the migration executable. + $message = new BatchLogMigrateMessage($context); + /** @var MigrationInterface $migration */ + $migration = \Drupal::getContainer()->get('plugin.manager.migration')->createInstance($migration_id); + $executable = new MigrateBatchExecutable($migration, $message, $options); + + if (empty($context['sandbox']['total'])) { + $context['sandbox']['total'] = $executable->getSource()->count(); + $context['sandbox']['batch_limit'] = $executable->calculateBatchLimit($context); + $context['results'][$migration->id()] = [ + '@numitems' => 0, + '@created' => 0, + '@updated' => 0, + '@failures' => 0, + '@ignored' => 0, + '@name' => $migration->id() + ]; + } + + // Every iteration, we reset out batch counter. + $context['sandbox']['batch_counter'] = 0; + + // Make sure we know our batch context. + $executable->setBatchContext($context); + + // Do the import. + $result = $executable->import(); + + // Store the result, we will need to combine the results of all our iterations. + $context['results'][$migration->id()] = [ + '@numitems' => $context['results'][$migration->id()]['@numitems'] + $executable->getProcessedCount(), + '@created' => $context['results'][$migration->id()]['@created'] + $executable->getCreatedCount(), + '@updated' => $context['results'][$migration->id()]['@updated'] + $executable->getUpdatedCount(), + '@failures' => $context['results'][$migration->id()]['@failures'] + $executable->getFailedCount(), + '@ignored' => $context['results'][$migration->id()]['@ignored'] + $executable->getIgnoredCount(), + '@name' => $migration->id() + ]; + + // Do some housekeeping. + if ( + $result != MigrationInterface::RESULT_INCOMPLETE + ) { + $context['finished'] = 1; + } + else { + $context['sandbox']['counter'] = $context['results'][$migration->id()]['@numitems']; + if ($context['sandbox']['counter'] <= $context['sandbox']['total']) { + $context['finished'] = ((float) $context['sandbox']['counter'] / (float) $context['sandbox']['total']); + $context['message'] = t('Importing %migration (@percent%).', [ + '%migration' => $migration->label(), + '@percent' => (int) ($context['finished'] * 100) + ]); + } + } + + } + + /** + * Finished callback for import batches. + * + * @param $success + * @param $results + * @param $operations + * @param $elapsed + */ + static public function batchFinishedImport($success, $results, $operations, $elapsed) { + if ($success) { + foreach ($results as $migration_id => $result) { + $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'"; + $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'"; + drupal_set_message(\Drupal::translation()->formatPlural($result['@numitems'], + $singular_message, + $plural_message, + $result)); + } + } + } + + /** + * @inheritdoc + */ + public function checkStatus() { + $status = parent::checkStatus(); + + if ($status == MigrationInterface::RESULT_COMPLETED) { + // Do some batch housekeeping. + $context = $this->getBatchContext(); + + if (!empty($context['sandbox']) && $context['sandbox']['operation'] == MigrateBatchExecutable::BATCH_IMPORT) { + $context['sandbox']['batch_counter']++; + if ($context['sandbox']['batch_counter'] >= $context['sandbox']['batch_limit']) { + $status = MigrationInterface::RESULT_INCOMPLETE; + } + } + } + + return $status; + } + + /** + * Calculates how much a single batch iteration will handle. + * + * @param $context + * + * @return float + */ + public function calculateBatchLimit($context) { + // TODO Maybe we need some other more sophisticated logic here? + return ceil($context['sandbox']['total'] / 100); + } + +} diff --git a/src/MigrateExecutable.php b/src/MigrateExecutable.php index e7437a7..7fb4aae 100644 --- a/src/MigrateExecutable.php +++ b/src/MigrateExecutable.php @@ -93,10 +93,12 @@ class MigrateExecutable extends MigrateExecutableBase { $this->feedback = $options['feedback']; } if (isset($options['idlist'])) { - $this->idlist = explode(',', $options['idlist']); - array_walk($this->idlist , function(&$value, $key) { - $value = explode(':', $value); - }); + if (is_string($options['idlist'])) { + $this->idlist = explode(',', $options['idlist']); + array_walk($this->idlist, function (&$value, $key) { + $value = explode(':', $value); + }); + } } $this->listeners[MigrateEvents::MAP_SAVE] = [$this, 'onMapSave'];