diff --git a/src/Plugin/migrate/destination/Table.php b/src/Plugin/migrate/destination/Table.php index f04d9f6..f72eb37 100755 --- a/src/Plugin/migrate/destination/Table.php +++ b/src/Plugin/migrate/destination/Table.php @@ -5,6 +5,8 @@ namespace Drupal\migrate_plus\Plugin\migrate\destination; use Drupal\Core\Database\Connection; use Drupal\Core\Database\Database; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; +use Drupal\migrate\Event\ImportAwareInterface; +use Drupal\migrate\Event\MigrateImportEvent; use Drupal\migrate\MigrateException; use Drupal\migrate\MigrateSkipProcessException; use Drupal\migrate\Plugin\MigrationInterface; @@ -21,7 +23,7 @@ use Symfony\Component\DependencyInjection\ContainerInterface; * id = "table" * ) */ -class Table extends DestinationBase implements ContainerFactoryPluginInterface { +class Table extends DestinationBase implements ContainerFactoryPluginInterface, ImportAwareInterface { /** * The name of the destination table. @@ -51,6 +53,27 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface { */ protected $dbConnection; + /** + * Maximum number of rows to insert in one query. + * + * @var int + */ + protected $batchSize; + + /** + * The query object being built row-by-row. + * + * @var array + */ + protected $rowsToInsert = []; + + /** + * The highest ID seen or created so far on this table. + * + * @var int + */ + protected $lastId = 0; + /** * Constructs a new Table. * @@ -71,6 +94,7 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface { $this->tableName = $configuration['table_name']; $this->idFields = $configuration['id_fields']; $this->fields = isset($configuration['fields']) ? $configuration['fields'] : []; + $this->batchSize = isset($configuration['batch_size']) ? $configuration['batch_size'] : 1; $this->supportsRollback = TRUE; } @@ -110,23 +134,79 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface { * {@inheritdoc} */ public function import(Row $row, array $old_destination_id_values = []) { - $id = $row->getSourceIdValues(); - if (count($id) != count($this->idFields)) { - throw new MigrateSkipProcessException('All the id fields are required for a table migration.'); + // Skip batching (if configured) for updates. + $batch_inserts = ($this->batchSize > 1 && empty($old_destination_id_values)); + $ids = []; + foreach ($this->idFields as $field => $fieldInfo) { + if ($row->hasDestinationProperty($field)) { + $ids[$field] = $row->getDestinationProperty($field); + } + elseif (!$row->hasDestinationProperty($field) && empty($fieldInfo['use_auto_increment'])) { + throw new MigrateSkipProcessException('All the id fields are required for a table migration.'); + } + // When batching, we do the auto-incrementing ourselves. + elseif ($batch_inserts && $fieldInfo['use_auto_increment']) { + if (count($this->rowsToInsert) == 0) { + // Get the highest existing ID, so we will create IDs above it. + $this->lastId = $this->dbConnection->query("SELECT MAX($field) AS MaxId FROM {{$this->tableName}}") + ->fetchField(); + if (!$this->lastId) { + $this->lastId = 0; + } + } + $id = ++$this->lastId; + $ids[$field] = $id; + $row->setDestinationProperty($field, $id); + } } - $values = $row->getDestination(); + // When batching, make sure we have the same properties in the same order + // every time. + if ($batch_inserts) { + $destination_properties = array_keys($this->migration->getProcess()); + $destination_properties = array_merge($destination_properties, + array_keys($this->idFields)); + sort($destination_properties); + $destination_values = $row->getDestination(); + foreach ($destination_properties as $property_name) { + $values[$property_name] = $destination_values[$property_name] ?? NULL; + } + } + else { + $values = $row->getDestination(); + } if ($this->fields) { $values = array_intersect_key($values, $this->fields); } - $status = $this->dbConnection->merge($this->tableName) - ->key($id) - ->fields($values) - ->execute(); + if ($batch_inserts) { + $this->rowsToInsert[] = $values; + if (count($this->rowsToInsert) >= $this->batchSize) { + $this->flushInserts(); + } + $status = TRUE; + } + // Row contains empty id field with use_auto_increment enabled. + elseif (count($ids) < count($this->idFields)) { + $status = $id = $this->dbConnection->insert($this->tableName) + ->fields($values) + ->execute(); + foreach ($this->idFields as $field => $fieldInfo) { + if (isset($fieldInfo['use_auto_increment']) && $fieldInfo['use_auto_increment'] === TRUE && !$row->hasDestinationProperty($field)) { + $row->setDestinationProperty($field, $id); + $ids[$field] = $id; + } + } + } + else { + $status = $this->dbConnection->merge($this->tableName) + ->keys($ids) + ->fields($values) + ->execute(); + } - return $status ? $id : NULL; + return $status ? $ids : NULL; } /** @@ -140,4 +220,43 @@ class Table extends DestinationBase implements ContainerFactoryPluginInterface { $delete->execute(); } + /** + * Execute the insert query and reset everything. + * + * @return bool + */ + public function flushInserts() { + if (count($this->rowsToInsert) > 0) { + $batch_query = $this->dbConnection->insert($this->tableName) + ->fields(array_keys($this->rowsToInsert[0])); + foreach ($this->rowsToInsert as $row) { + $batch_query->values(array_values($row)); + } + $batch_query->execute(); + $this->rowsToInsert = []; + } + } + + /** + * {@inheritDoc} + */ + public function preImport(MigrateImportEvent $event) { + } + + /** + * {@inheritDoc} + */ + public function postImport(MigrateImportEvent $event) { + // At the conclusion of a given migration, make sure batched inserts go in. + $this->flushInserts(); + } + + /** + * Make absolutely sure batched inserts are processed (especially for stubs). + */ + public function __destruct() { + // At the conclusion of a given migration, make sure batched inserts go in. + $this->flushInserts(); + } + } diff --git a/tests/src/Kernel/MigrateTableBatchTest.php b/tests/src/Kernel/MigrateTableBatchTest.php new file mode 100755 index 0000000..4458b23 --- /dev/null +++ b/tests/src/Kernel/MigrateTableBatchTest.php @@ -0,0 +1,19 @@ +connection = Database::getConnection(); + + $this->connection->schema()->createTable(static::TABLE_NAME, [ + 'description' => 'Test table', + 'fields' => [ + 'id' => [ + 'type' => 'serial', + 'not null' => TRUE, + ], + 'data1' => [ + 'type' => 'varchar', + 'length' => '32', + 'not null' => TRUE, + ], + 'data2' => [ + 'type' => 'varchar', + 'length' => '32', + 'not null' => TRUE, + ], + ], + 'primary key' => ['id'], + ]); + } + + /** + * {@inheritdoc} + */ + protected function tearDown() { + $this->connection->schema()->dropTable(static::TABLE_NAME); + parent::tearDown(); + } + + /** + * Create a minimally valid migration with some source data. + * + * @return array + * The migration definition. + */ + protected function getTableDestinationMigration() { + $definition = [ + 'id' => 'migration_table_test', + 'migration_tags' => ['Testing'], + 'source' => [ + 'plugin' => 'embedded_data', + 'data_rows' => [ + [ + 'data1' => 'dummy1 value1', + 'data2' => 'dummy2 value1', + ], + [ + 'data1' => 'dummy1 value2', + 'data2' => 'dummy2 value2', + ], + [ + 'data1' => 'dummy1 value3', + 'data2' => 'dummy2 value3', + ], + ], + 'ids' => [ + 'data1' => ['type' => 'string'], + ], + ], + 'destination' => [ + 'plugin' => 'table', + 'table_name' => static::TABLE_NAME, + 'id_fields' => [ + 'id' => [ + 'type' => 'integer', + 'use_auto_increment' => TRUE, + ], + ], + 'batch_size' => $this->batchSize, + ], + 'process' => [ + 'data1' => 'data1', + 'data2' => 'data2', + ], + ]; + return $definition; + } + + /** + * Tests table destination. + */ + public function testTableDestination() { + $migration = \Drupal::service('plugin.manager.migration')->createStubMigration($this->getTableDestinationMigration()); + + $executable = new MigrateExecutable($migration, $this); + $executable->import(); + + $values = $this->connection->select(static::TABLE_NAME) + ->fields(static::TABLE_NAME) + ->execute() + ->fetchAllAssoc('data1'); + + $this->assertEquals(1, $values['dummy1 value1']->id); + $this->assertEquals(2, $values['dummy1 value2']->id); + $this->assertEquals(3, $values['dummy1 value3']->id); + $this->assertEquals('dummy2 value3', $values['dummy1 value3']->data2); + $this->assertEquals(3, count($values)); + } + +} diff --git a/tests/src/Kernel/MigrateTableTest.php b/tests/src/Kernel/MigrateTableTest.php index dc47177..ceb38fa 100755 --- a/tests/src/Kernel/MigrateTableTest.php +++ b/tests/src/Kernel/MigrateTableTest.php @@ -4,6 +4,8 @@ namespace Drupal\Tests\migrate_plus\Kernel; use Drupal\Core\Database\Database; use Drupal\migrate\MigrateExecutable; +use Drupal\migrate\Plugin\MigrateIdMapInterface; +use Drupal\migrate\Row; use Drupal\Tests\migrate\Kernel\MigrateTestBase; /** @@ -22,6 +24,13 @@ class MigrateTableTest extends MigrateTestBase { */ protected $connection; + /** + * The batch size to configure (a size of 1 disables batching). + * + * @var int + */ + protected $batchSize = 1; + public static $modules = ['migrate_plus']; /** @@ -100,6 +109,7 @@ class MigrateTableTest extends MigrateTestBase { 'plugin' => 'table', 'table_name' => static::TABLE_NAME, 'id_fields' => ['data' => ['type' => 'string']], + 'batch_size' => $this->batchSize, ], 'process' => [ 'data' => 'data', @@ -160,4 +170,26 @@ class MigrateTableTest extends MigrateTestBase { $this->assertEquals(0, count($values)); } + /** + * Tests table update. + */ + public function testTableUpdate() { + // Make sure migration overwrites the original data for the first row. + $original_values = [ + 'data' => 'dummy value', + 'data2' => 'original value 2', + 'data3' => 'original value 3', + ]; + $this->connection->insert(static::TABLE_NAME) + ->fields($original_values) + ->execute(); + + /** @var \Drupal\migrate\Plugin\MigrationInterface $migration */ + $migration = \Drupal::service('plugin.manager.migration') + ->createStubMigration($this->getTableDestinationMigration()); + $migration->getIdMap()->saveIdMapping(new Row($original_values, + ['data' => 'dummy value']), ['data' => 'dummy value'], MigrateIdMapInterface::STATUS_NEEDS_UPDATE); + $this->testTableDestination(); + } + }