diff --git a/core/lib/Drupal/Core/Database/Driver/mysql/Schema.php b/core/lib/Drupal/Core/Database/Driver/mysql/Schema.php index ffe670f450..223bdd4b7d 100644 --- a/core/lib/Drupal/Core/Database/Driver/mysql/Schema.php +++ b/core/lib/Drupal/Core/Database/Driver/mysql/Schema.php @@ -460,7 +460,24 @@ public function dropField($table, $field) { return FALSE; } + // When dropping a field that is part of a composite primary key MySQL + // automatically adjusts the primary key, accordingly. MariaDB 10.2.8 and + // higher, however, requires explicitly dropping the primary key first and + // subsequently recreating it after the field has been dropped. We perform + // this recreation for compatibility. See + // https://mariadb.com/kb/en/library/alter-table for more information. + $primary_key = array_keys($this->connection->query('SHOW INDEX FROM {' . $table . '} WHERE key_name = ' . $this->connection->quote('PRIMARY'))->fetchAllAssoc('Column_name')); + $update_primary_key = (count($primary_key) > 1) && in_array($field, $primary_key, TRUE); + if ($update_primary_key) { + $this->dropPrimaryKey($table); + } + $this->connection->query('ALTER TABLE {' . $table . '} DROP `' . $field . '`'); + + if ($update_primary_key) { + $this->addPrimaryKey($table, array_diff($primary_key, [$field])); + } + return TRUE; } @@ -522,6 +539,17 @@ public function dropPrimaryKey($table) { return TRUE; } + /** + * {@inheritdoc} + */ + public function findPrimaryKeyColumns($table) { + if (!$this->tableExists($table)) { + return FALSE; + } + $result = $this->connection->query("SHOW KEYS FROM {" . $table . "} WHERE Key_name = 'PRIMARY'")->fetchAllAssoc('Column_name'); + return array_keys($result); + } + /** * {@inheritdoc} */ diff --git a/core/lib/Drupal/Core/Database/Driver/pgsql/Schema.php b/core/lib/Drupal/Core/Database/Driver/pgsql/Schema.php index 15bcb11ce5..780046c6f6 100644 --- a/core/lib/Drupal/Core/Database/Driver/pgsql/Schema.php +++ b/core/lib/Drupal/Core/Database/Driver/pgsql/Schema.php @@ -249,7 +249,7 @@ protected function createTableSql($name, $table) { } $sql_keys = []; - if (isset($table['primary key']) && is_array($table['primary key'])) { + if (!empty($table['primary key']) && is_array($table['primary key'])) { $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, '', 'pkey') . ' PRIMARY KEY (' . $this->createPrimaryKeySql($table['primary key']) . ')'; } if (isset($table['unique keys']) && is_array($table['unique keys'])) { @@ -606,7 +606,20 @@ public function dropField($table, $field) { return FALSE; } + $primary_key = $this->findPrimaryKeyColumns($table); + $this->connection->query('ALTER TABLE {' . $table . '} DROP COLUMN "' . $field . '"'); + + // When the field being dropped is part of a composite primary key, + // PostgreSQL drops that key automatically. Instead, MySQL removes the field + // from the key. For compatibility with MySQL we recreate the key in this + // case. See + // https://www.postgresql.org/docs/current/static/sql-altertable.html for + // more information. + if ((count($primary_key) > 1) && in_array($field, $primary_key, TRUE)) { + $this->addPrimaryKey($table, array_diff($primary_key, [$field])); + } + $this->resetTableInformation($table); return TRUE; } @@ -715,6 +728,29 @@ public function dropPrimaryKey($table) { return TRUE; } + /** + * {@inheritdoc} + */ + public function findPrimaryKeyColumns($table) { + if (!$this->tableExists($table)) { + return FALSE; + } + + // Fetch the 'indkey' column from 'pg_index' to figure out the order of the + // primary key. + // @todo Use 'array_position()' to be able to perform the ordering in SQL + // directly when 9.5 is the minimum PostgreSQL version. + $result = $this->connection->query("SELECT a.attname, i.indkey FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '{" . $table . "}'::regclass AND i.indisprimary")->fetchAllKeyed(); + if (!$result) { + return []; + } + + $order = explode(' ', reset($result)); + $columns = array_combine($order, array_keys($result)); + ksort($columns); + return array_values($columns); + } + /** * {@inheritdoc} */ diff --git a/core/lib/Drupal/Core/Database/Driver/sqlite/Schema.php b/core/lib/Drupal/Core/Database/Driver/sqlite/Schema.php index db2e348ef3..b54ede0abe 100644 --- a/core/lib/Drupal/Core/Database/Driver/sqlite/Schema.php +++ b/core/lib/Drupal/Core/Database/Driver/sqlite/Schema.php @@ -502,13 +502,15 @@ protected function introspectSchema($table) { $schema['fields'][$row->name]['length'] = $length; } if ($row->pk) { - $schema['primary key'][] = $row->name; + $schema['primary key'][$row->pk - 1] = $row->name; } } else { throw new \Exception("Unable to parse the column type " . $row->type); } } + ksort($schema['primary key']); + $indexes = []; $result = $this->connection->query('PRAGMA ' . $info['schema'] . '.index_list(' . $info['table'] . ')'); foreach ($result as $row) { @@ -741,6 +743,17 @@ public function dropPrimaryKey($table) { return TRUE; } + /** + * {@inheritdoc} + */ + public function findPrimaryKeyColumns($table) { + if (!$this->tableExists($table)) { + return FALSE; + } + $schema = $this->introspectSchema($table); + return $schema['primary key']; + } + /** * {@inheritdoc} */ diff --git a/core/lib/Drupal/Core/Database/Schema.php b/core/lib/Drupal/Core/Database/Schema.php index aafe1756ba..8baa83c0d3 100644 --- a/core/lib/Drupal/Core/Database/Schema.php +++ b/core/lib/Drupal/Core/Database/Schema.php @@ -408,6 +408,18 @@ public function fieldExists($table, $column) { */ abstract public function dropPrimaryKey($table); + /** + * Finds the primary key columns of a table, from the database. + * + * @param string $table + * The name of the table. + * + * @return string[]|false + * A simple array with the names of the columns composing the table's + * primary key, or FALSE if the table does not exist. + */ + abstract public function findPrimaryKeyColumns($table); + /** * Add a unique key. * diff --git a/core/tests/Drupal/KernelTests/Core/Database/SchemaTest.php b/core/tests/Drupal/KernelTests/Core/Database/SchemaTest.php index 9cbd186420..01a7cd71a8 100644 --- a/core/tests/Drupal/KernelTests/Core/Database/SchemaTest.php +++ b/core/tests/Drupal/KernelTests/Core/Database/SchemaTest.php @@ -12,6 +12,8 @@ /** * Tests table creation and modification via the schema API. * + * @coversDefaultClass \Drupal\Core\Database\Schema + * * @group Database */ class SchemaTest extends KernelTestBase { @@ -149,7 +151,8 @@ public function testSchema() { db_field_set_default('test_table', 'test_field', 0); db_add_field('test_table', 'test_serial', ['type' => 'serial', 'not null' => TRUE], ['primary key' => ['test_serial']]); - $this->assertPrimaryKeyColumns('test_table', ['test_serial']); + // Test the primary key columns. + $this->assertSame(['test_serial'], Database::getConnection()->schema()->findPrimaryKeyColumns('test_table')); $this->assertTrue($this->tryInsert(), 'Insert with a serial succeeded.'); $max1 = db_query('SELECT MAX(test_serial) FROM {test_table}')->fetchField(); @@ -163,7 +166,8 @@ public function testSchema() { // Test adding a new column and form a composite primary key with it. db_add_field('test_table', 'test_composite_primary_key', ['type' => 'int', 'not null' => TRUE, 'default' => 0], ['primary key' => ['test_serial', 'test_composite_primary_key']]); - $this->assertPrimaryKeyColumns('test_table', ['test_serial', 'test_composite_primary_key']); + // Test the primary key columns. + $this->assertSame(['test_serial', 'test_composite_primary_key'], Database::getConnection()->schema()->findPrimaryKeyColumns('test_table')); // Test renaming of keys and constraints. db_drop_table('test_table'); @@ -490,9 +494,9 @@ public function tryUnsignedInsert($table_name, $column_name) { } /** - * Tests adding columns to an existing table. + * Tests adding columns to an existing table with default and initial value. */ - public function testSchemaAddField() { + public function testSchemaAddFieldDefaultInitial() { // Test varchar types. foreach ([1, 32, 128, 256, 512] as $length) { $base_field_spec = [ @@ -695,6 +699,116 @@ protected function assertFieldCharacteristics($table_name, $field_name, $field_s } } + /** + * Tests creating tables and dropping fields that are part of the primary key. + * + * @param array $primary_key + * An array of primary keys of the test table. + * + * @dataProvider providerTestSchemaCreateTablePrimaryKey + */ + public function testSchemaCreateTablePrimaryKey(array $primary_key) { + $connection = Database::getConnection(); + $schema = $connection->schema(); + + // Test making the field the primary key of the table upon creation. + $table_name = 'test_table'; + $table_spec = [ + 'fields' => [ + 'test_field' => ['type' => 'int'], + 'other_test_field' => ['type' => 'int'], + ], + 'primary key' => $primary_key, + ]; + $schema->createTable($table_name, $table_spec); + $this->assertTrue($schema->fieldExists($table_name, 'test_field')); + $this->assertEquals($primary_key, $schema->findPrimaryKeyColumns($table_name)); + + // Drop the field and make sure the primary key was dropped, as well. + $schema->dropField($table_name, 'test_field'); + $this->assertFalse($schema->fieldExists($table_name, 'test_field')); + $expected = array_values(array_diff($primary_key, ['test_field'])); + $this->assertEquals($expected, $schema->findPrimaryKeyColumns($table_name)); + } + + /** + * Provides test cases for SchemaTest::testSchemaCreateTablePrimaryKey(). + * + * @return array + * An array of test cases for SchemaTest::testSchemaCreateTablePrimaryKey(). + */ + public function providerTestSchemaCreateTablePrimaryKey() { + $tests = []; + + $tests['simple_primary_key'] = [ + 'primary_key' => ['test_field'], + ]; + $tests['composite_primary_key'] = [ + 'primary_key' => ['test_field', 'other_test_field'], + ]; + $tests['composite_primary_key_different_order'] = [ + 'primary_key' => ['other_test_field', 'test_field'], + ]; + + return $tests; + } + + /** + * Tests adding and dropping fields that are part of the primary key. + * + * @param array $initial_primary_key + * An array of initial primary keys of the test table. + * @param array $new_primary_key + * An array of primary keys to create with the added field. + * + * @dataProvider providerTestSchemaAddFieldPrimaryKey + */ + public function testSchemaAddFieldPrimaryKey($initial_primary_key, $new_primary_key) { + $connection = Database::getConnection(); + $schema = $connection->schema(); + + $field_spec = ['type' => 'int']; + + // Try adding a field as the primary key to an existing table. + $table_name = 'test_table'; + $table_spec = [ + 'fields' => [ + 'other_test_field' => $field_spec, + ], + 'primary key' => $initial_primary_key, + ]; + $schema->createTable($table_name, $table_spec); + $this->assertEquals($initial_primary_key, $schema->findPrimaryKeyColumns($table_name)); + $schema->addField($table_name, 'test_field', $field_spec, ['primary key' => $new_primary_key]); + $this->assertTrue($schema->fieldExists($table_name, 'test_field')); + $this->assertEquals($new_primary_key, $schema->findPrimaryKeyColumns($table_name)); + } + + /** + * Provides test cases for SchemaTest::testSchemaAddFieldPrimaryKey(). + * + * @return array + * An array of test cases for SchemaTest::testSchemaAddFieldPrimaryKey(). + */ + public function providerTestSchemaAddFieldPrimaryKey() { + $tests = []; + + $tests['simple_primary_key'] = [ + 'initial_primary_key' => [], + 'new_primary_key' => ['test_field'], + ]; + $tests['composite_primary_key'] = [ + 'initial_primary_key' => ['other_test_field'], + 'new_primary_key' => ['test_field', 'other_test_field'], + ]; + $tests['composite_primary_key_different_order'] = [ + 'initial_primary_key' => ['other_test_field'], + 'new_primary_key' => ['other_test_field', 'test_field'], + ]; + + return $tests; + } + /** * Tests changing columns between types. */ @@ -791,6 +905,124 @@ protected function assertFieldChange($old_spec, $new_spec, $test_data = NULL) { db_drop_table($table_name); } + /** + * @covers ::findPrimaryKeyColumns + */ + public function testFindPrimaryKeyColumns() { + /** @var \Drupal\Core\Database\Schema $schema */ + $schema = Database::getConnection()->schema(); + + // Test with single column primary key. + $schema->createTable('table_with_pk_0', [ + 'description' => 'Table with primary key.', + 'fields' => [ + 'id' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'test_field' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + ], + 'primary key' => ['id'], + ]); + $this->assertSame(['id'], $schema->findPrimaryKeyColumns('table_with_pk_0')); + + // Test with multiple column primary key. + $schema->createTable('table_with_pk_1', [ + 'description' => 'Table with primary key with multiple columns.', + 'fields' => [ + 'id0' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'id1' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'test_field' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + ], + 'primary key' => ['id0', 'id1'], + ]); + $this->assertSame(['id0', 'id1'], $schema->findPrimaryKeyColumns('table_with_pk_1')); + + // Test with multiple column primary key and not being the first column of + // the table definition. + $schema->createTable('table_with_pk_2', [ + 'description' => 'Table with primary key with multiple columns at the end and in reverted sequence.', + 'fields' => [ + 'test_field_1' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'test_field_2' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'id3' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'id4' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + ], + 'primary key' => ['id4', 'id3'], + ]); + $this->assertSame(['id4', 'id3'], $schema->findPrimaryKeyColumns('table_with_pk_2')); + + // Test with multiple column primary key in a different order. For the + // PostgreSQL and the SQLite drivers is sorting used to get the primary key + // columns in the right order. + $schema->createTable('table_with_pk_3', [ + 'description' => 'Table with primary key with multiple columns at the end and in reverted sequence.', + 'fields' => [ + 'test_field_1' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'test_field_2' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'id3' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'id4' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + ], + 'primary key' => ['id3', 'test_field_2', 'id4'], + ]); + $this->assertSame(['id3', 'test_field_2', 'id4'], $schema->findPrimaryKeyColumns('table_with_pk_3')); + + // Test with table without a primary key. + $schema->createTable('table_without_pk', [ + 'description' => 'Table without primary key.', + 'fields' => [ + 'id' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + 'test_field' => [ + 'type' => 'int', + 'not null' => TRUE, + ], + ], + ]); + $this->assertSame([], $schema->findPrimaryKeyColumns('table_without_pk')); + + // Test with non existing table. + $this->assertFalse($schema->findPrimaryKeyColumns('non_existing_table')); + } + /** * Tests the findTables() method. */ @@ -849,46 +1081,4 @@ public function testFindTables() { Database::setActiveConnection('default'); } - /** - * Tests the primary keys of a table. - * - * @param string $table_name - * The name of the table to check. - * @param array $primary_key - * The expected key column specifier for a table's primary key. - */ - protected function assertPrimaryKeyColumns($table_name, array $primary_key = []) { - $db_type = Database::getConnection()->databaseType(); - - switch ($db_type) { - case 'mysql': - $result = Database::getConnection()->query("SHOW KEYS FROM {" . $table_name . "} WHERE Key_name = 'PRIMARY'")->fetchAllAssoc('Column_name'); - $this->assertSame($primary_key, array_keys($result)); - - break; - case 'pgsql': - $result = Database::getConnection()->query("SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type - FROM pg_index i - JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) - WHERE i.indrelid = '{" . $table_name . "}'::regclass AND i.indisprimary") - ->fetchAllAssoc('attname'); - $this->assertSame($primary_key, array_keys($result)); - - break; - case 'sqlite': - // For SQLite we need access to the protected - // \Drupal\Core\Database\Driver\sqlite\Schema::introspectSchema() method - // because we have no other way of getting the table prefixes needed for - // running a straight PRAGMA query. - $schema_object = Database::getConnection()->schema(); - $reflection = new \ReflectionMethod($schema_object, 'introspectSchema'); - $reflection->setAccessible(TRUE); - - $table_info = $reflection->invoke($schema_object, $table_name); - $this->assertSame($primary_key, $table_info['primary key']); - - break; - } - } - }