diff --git a/core/lib/Drupal/Core/Database/Connection.php b/core/lib/Drupal/Core/Database/Connection.php
index 2aa6315..8806d5d 100644
--- a/core/lib/Drupal/Core/Database/Connection.php
+++ b/core/lib/Drupal/Core/Database/Connection.php
@@ -79,6 +79,13 @@
   protected $transactionSupport = TRUE;
 
   /**
+   * Supported transaction isolation levels.
+   *
+   * @var string[]
+   */
+  protected $supportedIsolationLevels;
+
+  /**
    * Whether this database connection supports transactional DDL.
    *
    * Set to FALSE by default because few databases support this feature.
@@ -146,6 +153,33 @@
   protected $unprefixedTablesMap = [];
 
   /**
+   * The ambient isolation level, either set by the user in settings.php
+   * or read from the database default's when starting a connection
+   *
+   * @var string
+   */
+  protected $ambientIsolationLevel;
+
+  /**
+   * Every connection has a uniqueId
+   * used for testing purposes.
+   *
+   * @var string
+   */
+  protected $id;
+
+  /**
+   * Even if the driver supports transactions,
+   * we might want a connection that not only
+   * does not support transaction, but will throw
+   * an exception when trying to consume them.
+   *
+   * @var string
+   */
+  protected $block_transactions = FALSE;
+
+
+  /**
    * Constructs a Connection object.
    *
    * @param \PDO $connection
@@ -160,6 +194,9 @@ public function __construct(\PDO $connection, array $connection_options) {
     // Initialize and prepare the connection prefix.
     $this->setPrefix(isset($connection_options['prefix']) ? $connection_options['prefix'] : '');
 
+    // Initialize and set the connection disallow transactions setting.
+    $this->setBlockTransactions(isset($connection_options['block_transactions']) ? $connection_options['block_transactions'] : '');
+
     // Set a Statement class, unless the driver opted out.
     if (!empty($this->statementClass)) {
       $connection->setAttribute(\PDO::ATTR_STATEMENT_CLASS, array($this->statementClass, array($this)));
@@ -167,6 +204,8 @@ public function __construct(\PDO $connection, array $connection_options) {
 
     $this->connection = $connection;
     $this->connectionOptions = $connection_options;
+
+    $this->id = uniqid('', TRUE);
   }
 
   /**
@@ -181,6 +220,13 @@ public function __construct(\PDO $connection, array $connection_options) {
   public static function open(array &$connection_options = array()) { }
 
   /**
+   * Get this connection unique identifier.
+   */
+  public function getId() {
+    return $this->id;
+  }
+
+  /**
    * Destroys this Connection object.
    *
    * PHP does not destruct an object if it is still referenced in other
@@ -276,6 +322,21 @@ public function getConnectionOptions() {
   }
 
   /**
+   * Set if transactions are blocked for this connection.
+   *
+   * When transactions are blocked, any attempt to start a transaction
+   * on this connection will throw an exception.
+   *
+   * @param bool $block
+   */
+  public function setBlockTransactions($block) {
+    if ($this->inTransaction()) {
+      throw new DatabaseExceptionWrapper("Cannot change the block transaction settings while a transaction is in progress.");
+    }
+    $this->block_transactions = $block;
+  }
+
+  /**
    * Set the list of prefixes used by this database connection.
    *
    * @param array|string $prefix
@@ -1048,9 +1109,12 @@ public function transactionDepth() {
    *
    * @see \Drupal\Core\Database\Transaction
    */
-  public function startTransaction($name = '') {
+  public function startTransaction($name = '', TransactionSettings $settings = NULL) {
     $class = $this->getDriverClass('Transaction');
-    return new $class($this, $name);
+    if (empty($settings)) {
+      $settings = TransactionSettings::GetDefaults();
+    }
+    return new $class($this, $name, $settings);
   }
 
   /**
@@ -1085,14 +1149,18 @@ public function rollback($savepoint_name = 'drupal_transaction') {
     // we need to throw an exception.
     $rolled_back_other_active_savepoints = FALSE;
     while ($savepoint = array_pop($this->transactionLayers)) {
-      if ($savepoint == $savepoint_name) {
+      if ($savepoint['name'] == $savepoint_name) {
         // If it is the last the transaction in the stack, then it is not a
         // savepoint, it is the transaction itself so we will need to roll back
         // the transaction rather than a savepoint.
         if (empty($this->transactionLayers)) {
           break;
         }
-        $this->query('ROLLBACK TO SAVEPOINT ' . $savepoint);
+        // When using REQUIRES this transaction might have never been started
+        // because it already had an ambient transaction.
+        if ($savepoint['started'] == TRUE) {
+          $this->query('ROLLBACK TO SAVEPOINT ' . $savepoint['name']);
+        }
         $this->popCommittableTransactions();
         if ($rolled_back_other_active_savepoints) {
           throw new TransactionOutOfOrderException();
@@ -1103,7 +1171,14 @@ public function rollback($savepoint_name = 'drupal_transaction') {
         $rolled_back_other_active_savepoints = TRUE;
       }
     }
-    $this->connection->rollBack();
+    /** @var TransactionSettings */
+    $settings = $savepoint['settings'];
+    // If the first transaction in the stack is of type supress, then this connection
+    // was created for the sole purpose of not having an active transaction.
+    if ($settings->getScopeOption() != TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS) {
+      $this->connection->rollBack();
+    }
+    $this->RestoreAmbientIsolationLevel($settings);
     if ($rolled_back_other_active_savepoints) {
       throw new TransactionOutOfOrderException();
     }
@@ -1116,27 +1191,78 @@ public function rollback($savepoint_name = 'drupal_transaction') {
    *
    * @param string $name
    *   The name of the transaction.
+   * @param TransactionSettings $settings
+   *   The transaction settings.
+   * @param string $previous_connection_key
+   *   If this push comes from a transaction creation, the
+   *   previously active active database key.
    *
    * @throws \Drupal\Core\Database\TransactionNameNonUniqueException
    *
    * @see \Drupal\Core\Database\Transaction
    */
-  public function pushTransaction($name) {
+  public function pushTransaction($name, TransactionSettings $settings, $previous_connection_key = NULL) {
     if (!$this->supportsTransactions()) {
       return;
     }
     if (isset($this->transactionLayers[$name])) {
       throw new TransactionNameNonUniqueException($name . " is already in use.");
     }
-    // If we're already in a transaction then we want to create a savepoint
-    // rather than try to create another transaction.
+    if ($this->block_transactions) {
+      throw new TransactionBlockedException("Transactions have been blocked for this connection.");
+    }
+    if ($this->inTransaction() && !empty($previous_connection_key)) {
+      throw new DatabaseExceptionWrapper("To push a transaction with a previous connection key, it must be the first one in the transaction stack.");
+    }
+    // Depending on the scopeoption this transaction might be started or not.
+    $started = FALSE;
+    // Depending on the scope option, we might need to keep track
+    // of what the active connection was before.
     if ($this->inTransaction()) {
-      $this->query('SAVEPOINT ' . $name);
+      switch ($settings->getScopeOption()) {
+        case TransactionSettings::TRANSACTION_SCOPEOPTION_LEGACY:
+          // If we're already in a transaction then we want to create a savepoint
+          // rather than try to create another transaction.
+          // Of course, ONLY MAKES SENSE if the new isolation option is exactly the
+          // same as that of the parent transaction.
+          // TODO: Look into the transaction layer scope and see what isolation level we have now.
+          $this->query('SAVEPOINT ' . $name);
+          $started = TRUE;
+          break;
+        case TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED:
+          // We already have a transaction. Do nothing.
+          break;
+        case TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRESNEW:
+          throw new TransactionCannotBePushedException();
+        case TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS:
+          throw new TransactionCannotBePushedException();
+        default:
+          throw new DatabaseExceptionWrapper("Scope option not supported");
+      }
     }
     else {
+      // Is supress requested and no active transaction, do nothing.
+      if ($settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS) {
+        return;
+      }
+      // If we are not in a transaction no need
+      if ($settings->getIsolationLevel() != TransactionSettings::TRANSACTION_ISOLATION_IGNORE) {
+        // Make sure that we have a valid ambient/default isolation level populated that we
+        // can later restore.
+        $this->getAmbientTransactionIsolationLevel();
+        // TODO: Instead of reading this from the context, it *might* be better
+        // to have some defaults in settings.php that are forcefully set when openning
+        // the connection. Or maybe getAmbientTransactionIsolationLevel should take care
+        // of that when opening the connection and store the value once.
+        if ($this->getAmbientTransactionIsolationLevel() != $settings->getIsolationLevel()) {
+          $this->setTransactionIsolationLevel($settings->getIsolationLevel());
+        }
+      }
+      // Isolation mode IGNORE means.... to NOT start a transaction...
       $this->connection->beginTransaction();
     }
-    $this->transactionLayers[$name] = $name;
+    // Store name and settings in the stack.
+    $this->transactionLayers[$name] = array('settings' => $settings, 'active' => TRUE, 'name' => $name, 'started' => $started, 'previous_connection_key' => $previous_connection_key);
   }
 
   /**
@@ -1167,7 +1293,7 @@ public function popTransaction($name) {
     }
 
     // Mark this layer as committable.
-    $this->transactionLayers[$name] = FALSE;
+    $this->transactionLayers[$name]['active'] = FALSE;
     $this->popCommittableTransactions();
   }
 
@@ -1176,26 +1302,121 @@ public function popTransaction($name) {
    */
   protected function popCommittableTransactions() {
     // Commit all the committable layers.
-    foreach (array_reverse($this->transactionLayers) as $name => $active) {
+    foreach (array_reverse($this->transactionLayers) as $name => $state) {
+      /** @var TransactionSettings $settings */
+      $settings = $state['settings'];
       // Stop once we found an active transaction.
-      if ($active) {
+      if ($state['active']) {
         break;
       }
-
       // If there are no more layers left then we should commit.
       unset($this->transactionLayers[$name]);
+
+      // Only restore any previous connection if the connection
+      // was actualy replaced.
+      if (!empty($state['previous_connection_key'])) {
+        if (!empty($this->transactionLayers)) {
+          // Only the first transaction in the stack should have
+          // a reference to any previous connection!
+          throw new DatabaseExceptionWrapper("Something is wrong.");
+        }
+      }
+
       if (empty($this->transactionLayers)) {
-        if (!$this->connection->commit()) {
-          throw new TransactionCommitFailedException();
+        try {
+          // PDO::commit() can either return FALSE or throw an exception itself
+          if (!$this->connection->commit()) {
+            throw new TransactionCommitFailedException();
+          }
+        }
+        finally {
+          $this->RestoreAmbientIsolationLevel($settings);
         }
       }
-      else {
+      // Only the legacy mode stacks save points.
+      elseif ($settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_LEGACY) {
         $this->query('RELEASE SAVEPOINT ' . $name);
       }
     }
   }
 
   /**
+   * Restore the ambient transaction isolation level.
+   *
+   * @param TransactionSettings $settings
+   */
+  protected function RestoreAmbientIsolationLevel(TransactionSettings $settings) {
+    // Restore the ambient transaction isolation level for this connection
+    // if it is different than the one from the transaction.
+    $ambient = $this->getAmbientTransactionIsolationLevel();
+    if (!in_array($ambient, [TransactionSettings::TRANSACTION_ISOLATION_IGNORE, $settings->getIsolationLevel()])) {
+      $this->setTransactionIsolationLevel($ambient);
+    }
+  }
+
+  /**
+   * Get the current active isolation level.
+   *
+   * Database drivers that support transaction isolation levels should extend
+   * this class to return the current transaction isolation level.
+   *
+   * @return string
+   *   Isolation level constant.
+   */
+  public function getTransactionIsolationLevel() {
+    return TransactionSettings::TRANSACTION_ISOLATION_IGNORE;
+  }
+
+  /**
+   * Get the ambient/default transaction isolation level for current session.
+   *
+   * Database drivers that support transaction isolation levels should extend
+   * this class to return the ambient/default isolation level.
+   *
+   * @return string
+   *   Isolation level constant.
+   */
+  public function getAmbientTransactionIsolationLevel() {
+    if (!isset($this->ambientIsolationLevel)) {
+      $this->ambientIsolationLevel = $this->getTransactionIsolationLevel();
+    }
+    return $this->ambientIsolationLevel;
+  }
+
+  /**
+   * Set the default transaction isolation level for this session.
+   * 
+   * This method can be called outside a transaction (to affect the
+   * ambient transaction mode) or before and right after starting a
+   * transaction to set the isolation level for that transaction
+   * and to restore the ambient isolation level.
+   *
+   * @param string $level
+   * @throws \Exception
+   */
+  protected function setTransactionIsolationLevel($level) {
+    if ($level == TransactionSettings::TRANSACTION_ISOLATION_IGNORE) {
+      return;
+    }
+    throw new \Exception("Not implemented.");
+  }
+
+  /**
+   * Transaction lock timeout for the current session in seconds.
+   *
+   * Drivers must override this with their specific implementations.
+   *
+   * @param int $timeout
+   *   The timeout in seconds
+   */
+  public function setTransactionLockTimeout($timeout) {
+    if (!$this->supportsTransactions()) {
+      return;
+    }
+    throw new DatabaseExceptionWrapper("Not implemented.");
+  }
+
+  /**
    * Runs a limited-range query on this database object.
    *
    * Use this as a substitute for ->query() when a subset of the query is to be
@@ -1296,6 +1517,22 @@ public function supportsTransactions() {
   }
 
   /**
+   * Determines the list of supported transaction isolation levels.
+   *
+   * Empty does not mean that it does not support transactions, if
+   * supportsTransactions() return TRUE the it means that the isolation
+   * level is not configurable. Such as in Sqlite where it is the database
+   * administrator that must configure the global isolation level and
+   * this cannot be changed.
+   *
+   * @return string[]
+   *   List of supported isolation levels
+   */
+  public function supportedTransactionIsolationLevels() {
+    return $this->supportedIsolationLevels;
+  }
+
+  /**
    * Determines if this driver supports transactional DDL.
    *
    * DDL queries are those that change the schema, such as ALTER queries.
diff --git a/core/lib/Drupal/Core/Database/Database.php b/core/lib/Drupal/Core/Database/Database.php
index d48c6f2..126ebe4 100644
--- a/core/lib/Drupal/Core/Database/Database.php
+++ b/core/lib/Drupal/Core/Database/Database.php
@@ -104,7 +104,8 @@
       // Every target already active for this connection key needs to have the
       // logging object associated with it.
       if (!empty(self::$connections[$key])) {
-        foreach (self::$connections[$key] as $connection) {
+        foreach (array_keys(self::$connections[$key]) as $target) {
+          foreach (self::$connections[$key][$target] as $alias => $connection)
           $connection->setLogger(self::$logs[$key]);
         }
       }
@@ -148,11 +149,13 @@
    *   The database target name.
    * @param string $key
    *   The database connection key. Defaults to NULL which means the active key.
+   * @param string $instance
+   *   Allow to have multiple instance of the same connection.
    *
    * @return \Drupal\Core\Database\Connection
    *   The corresponding connection object.
    */
-  final public static function getConnection($target = 'default', $key = NULL) {
+  final public static function getConnection($target = 'default', $key = NULL, $instance = 'default') {
     if (!isset($key)) {
       // By default, we want the active connection, set in setActiveConnection.
       $key = self::$activeKey;
@@ -166,11 +169,11 @@
       $target = 'default';
     }
 
-    if (!isset(self::$connections[$key][$target])) {
+    if (!isset(self::$connections[$key][$target][$instance])) {
       // If necessary, a new connection is opened.
-      self::$connections[$key][$target] = self::openConnection($key, $target);
+      self::$connections[$key][$target][$instance] = self::openConnection($key, $target);
     }
-    return self::$connections[$key][$target];
+    return self::$connections[$key][$target][$instance];
   }
 
   /**
@@ -202,6 +205,19 @@
   }
 
   /**
+   * Return the active connection key. False if no connection
+   * is active.
+   *
+   * @return string|bool
+   */
+  final public static function getActiveConnectionKey() {
+    if (!static::isActiveConnection()) {
+      return FALSE;
+    }
+    return self::$activeKey;
+  }
+
+  /**
    * Process the configuration file for database information.
    *
    * @param array $info
@@ -400,8 +416,10 @@
    *   connections will be closed.
    * @param string $key
    *   The database connection key. Defaults to NULL which means the active key.
+   * @param string $instance
+   *   The connection instance. Default to NULL meaning all instances.
    */
-  public static function closeConnection($target = NULL, $key = NULL) {
+  public static function closeConnection($target = NULL, $key = NULL, $instance = NULL) {
     // Gets the active connection by default.
     if (!isset($key)) {
       $key = self::$activeKey;
@@ -410,18 +428,30 @@ public static function closeConnection($target = NULL, $key = NULL) {
     // static variable. In all cases, closeConnection() might be called for a
     // connection that was not opened yet, in which case the key is not defined
     // yet and we just ensure that the connection key is undefined.
-    if (isset($target)) {
+    if (isset($target) && isset($instance)) {
+      if (isset(self::$connections[$key][$target][$instance])) {
+        self::$connections[$key][$target][$instance]->destroy();
+        self::$connections[$key][$target][$instance] = NULL;
+      }
+      unset(self::$connections[$key][$target][$instance]);
+    }
+    elseif (isset($target)) {
       if (isset(self::$connections[$key][$target])) {
-        self::$connections[$key][$target]->destroy();
-        self::$connections[$key][$target] = NULL;
+        foreach (array_keys(self::$connections[$key][$target]) as $alias) {
+          self::$connections[$key][$target][$alias]->destroy();
+          self::$connections[$key][$target][$alias] = NULL;
+          unset(self::$connections[$key][$target][$alias]);
+        }
       }
-      unset(self::$connections[$key][$target]);
     }
     else {
       if (isset(self::$connections[$key])) {
-        foreach (self::$connections[$key] as $target => $connection) {
-          self::$connections[$key][$target]->destroy();
-          self::$connections[$key][$target] = NULL;
+        foreach (array_keys(self::$connections[$key]) as $target) {
+          foreach (self::$connections[$key][$target] as $alias => $connection) {
+            self::$connections[$key][$target][$alias]->destroy();
+            self::$connections[$key][$target][$alias] = NULL;
+            unset(self::$connections[$key][$target][$alias]);
+          }
         }
       }
       unset(self::$connections[$key]);
@@ -489,6 +519,15 @@ public static function convertDbUrlToConnectionInfo($url, $root) {
     if (isset($info['port'])) {
       $database['port'] = $info['port'];
     }
+
+    if (isset($info['query'])) {
+      $parts = array();
+      parse_str($info['query'], $parts);
+      if (isset($parts['namespace'])) {
+        $database['namespace'] = $parts['namespace'];
+      }
+    }
+
     return $database;
   }
 
@@ -525,6 +564,9 @@ public static function getConnectionInfoAsUrl($key = 'default') {
     if ($db_info['default']['prefix']['default']) {
       $db_url .= '#' . $db_info['default']['prefix']['default'];
     }
+    if (isset($db_info['default']['namespace'])) {
+      $db_url .= '?namespace=' . $db_info['default']['namespace'];
+    }
     return $db_url;
   }
 
diff --git a/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php b/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
index dd8c299..1aff9f8 100644
--- a/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
+++ b/core/lib/Drupal/Core/Database/Driver/mysql/Connection.php
@@ -8,12 +8,12 @@
 namespace Drupal\Core\Database\Driver\mysql;
 
 use Drupal\Core\Database\DatabaseExceptionWrapper;
-
 use Drupal\Core\Database\Database;
 use Drupal\Core\Database\DatabaseNotFoundException;
 use Drupal\Core\Database\TransactionCommitFailedException;
 use Drupal\Core\Database\DatabaseException;
 use Drupal\Core\Database\Connection as DatabaseConnection;
+use Drupal\Core\Database\TransactionSettings;
 use Drupal\Component\Utility\Unicode;
 
 /**
@@ -72,6 +72,8 @@ public function __construct(\PDO $connection, array $connection_options = array(
     // This driver defaults to transaction support, except if explicitly passed FALSE.
     $this->transactionSupport = !isset($connection_options['transactions']) || ($connection_options['transactions'] !== FALSE);
 
+    $this->supportedIsolationLevels = TransactionSettings::getAllIsolationLevels();
+
     // MySQL never supports transactional DDL.
     $this->transactionalDDLSupport = FALSE;
 
@@ -84,7 +86,8 @@ public function __construct(\PDO $connection, array $connection_options = array(
   public function query($query, array $args = array(), $options = array()) {
     try {
       return parent::query($query, $args, $options);
-    } catch (DatabaseException $e) {
+    }
+    catch (DatabaseException $e) {
       if ($e->getPrevious()->errorInfo[1] == 1153) {
         // If a max_allowed_packet error occurs the message length is truncated.
         // This should prevent the error from recurring if the exception is
@@ -291,20 +294,41 @@ public function nextIdDelete() {
    */
   protected function popCommittableTransactions() {
     // Commit all the committable layers.
-    foreach (array_reverse($this->transactionLayers) as $name => $active) {
+    foreach (array_reverse($this->transactionLayers) as $name => $state) {
+      /** @var TransactionSettings $settings */
+      $settings = $state['settings'];
       // Stop once we found an active transaction.
-      if ($active) {
+      if ($state['active']) {
         break;
       }
-
       // If there are no more layers left then we should commit.
       unset($this->transactionLayers[$name]);
+
+      // Only restore any previous connection if the connection
+      // was actualy replaced.
+      if (!empty($state['previous_connection_key'])) {
+        if (!empty($this->transactionLayers)) {
+          // Only the first transaction in the stack should have
+          // a reference to any previous connection, this is
+          // just a security measure to make sure the implementation
+          // is not messed up.
+          throw new DatabaseExceptionWrapper("Only the first transactin in the stackshould have the previous_connection_key property.");
+        }
+      }
+
       if (empty($this->transactionLayers)) {
-        if (!$this->connection->commit()) {
-          throw new TransactionCommitFailedException();
+        try {
+          // PDO::commit() can either return FALSE or throw an exception itself
+          if (!$this->connection->commit()) {
+            throw new TransactionCommitFailedException();
+          }
+        }
+        finally {
+          $this->RestoreAmbientIsolationLevel($settings);
         }
       }
-      else {
+      // Only the legacy mode stacks save points.
+      elseif ($settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_LEGACY) {
         // Attempt to release this savepoint in the standard way.
         try {
           $this->query('RELEASE SAVEPOINT ' . $name);
@@ -324,6 +348,8 @@ protected function popCommittableTransactions() {
             // We also have to explain to PDO that the transaction stack has
             // been cleaned-up.
             $this->connection->commit();
+            // Restore the ambient transaction isolation level.
+            $this->RestoreAmbientIsolationLevel($settings);
           }
           else {
             throw $e;
@@ -333,6 +359,66 @@ protected function popCommittableTransactions() {
     }
   }
 
+  /**
+   * {@inheritdoc}
+   */
+  public function getTransactionIsolationLevel() {
+    // SELECT @@tx_isolation is the session level setting
+    // and will not report what we expect it to be when inside a transaction.
+    if ($this->inTransaction()) {
+      throw new DatabaseExceptionWrapper("MySQL does not support retrieving the transaction isolation level inside a transaction.");
+    }
+    $ret = NULL;
+    $level_string = strtoupper(trim($this->connection->query("SELECT @@tx_isolation")->fetchField()));
+    switch ($level_string) {
+      case 'SERIALIZABLE':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE;
+        break;
+      case 'REPEATABLE-READ':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD;
+        break;
+      case 'READ-COMMITTED':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED;
+        break;
+      case 'READ-UNCOMMITTED':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED;
+        break;
+      default:
+        throw new \Exception("Isolation level not supported: $level_string");
+    }
+    return $ret;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function setTransactionIsolationLevel($level) {
+    $level_string = '';
+    switch ($level) {
+      case TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE:
+        $level_string = 'SERIALIZABLE';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD:
+        $level_string = 'REPEATABLE READ';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED:
+        $level_string = 'READ COMMITTED';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED:
+        $level_string = 'READ UNCOMMITTED';
+        break;
+      default:
+        throw new \InvalidArgumentException("Invalid transaction isolation level provided: $level");
+    }
+    $this->connection->query('SET TRANSACTION ISOLATION LEVEL ' . $level_string);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function setTransactionLockTimeout($timeout) {
+    $this->connection->query("SET SESSION innodb_lock_wait_timeout = $timeout");
+  }
 }
 
 
diff --git a/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php b/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php
index 81aeabc..565be60 100644
--- a/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php
+++ b/core/lib/Drupal/Core/Database/Driver/pgsql/Connection.php
@@ -7,9 +7,11 @@
 
 namespace Drupal\Core\Database\Driver\pgsql;
 
+use Drupal\Core\Database\DatabaseExceptionWrapper;
 use Drupal\Core\Database\Database;
 use Drupal\Core\Database\Connection as DatabaseConnection;
 use Drupal\Core\Database\DatabaseNotFoundException;
+use Drupal\Core\Database\TransactionSettings;
 
 /**
  * @addtogroup database
@@ -65,6 +67,21 @@ public function __construct(\PDO $connection, array $connection_options) {
     // but we'll only enable it if standard transactions are.
     $this->transactionalDDLSupport = $this->transactionSupport;
 
+    // In PostgreSQL READ UNCOMMITTED is treated as READ COMMITTED.
+    // so there is no real READ UNCOMMITED.
+    $this->supportedIsolationLevels = [
+        TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED,
+        TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD,
+      ];
+
+    // Prior to PostgreSQL version 9.1, a request for the Serializable
+    // transaction isolation level provided exactly the same behavior
+    // described here (REPEATABLEREAD). To retain the legacy
+    // Serializable behavior, Repeatable Read should now be requested.
+    if (version_compare($this->version(), '9.1') >= 0) {
+      $this->supportedIsolationLevels[] = TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE;
+    }
+
     $this->connectionOptions = $connection_options;
 
     // Force PostgreSQL to use the UTF-8 character set by default.
@@ -272,12 +289,12 @@ public function createDatabase($database) {
 
   public function mapConditionOperator($operator) {
     static $specials = array(
-      // In PostgreSQL, 'LIKE' is case-sensitive. For case-insensitive LIKE
-      // statements, we need to use ILIKE instead.
-      'LIKE' => array('operator' => 'ILIKE'),
-      'LIKE BINARY' => array('operator' => 'LIKE'),
-      'NOT LIKE' => array('operator' => 'NOT ILIKE'),
-      'REGEXP' => array('operator' => '~*'),
+    // In PostgreSQL, 'LIKE' is case-sensitive. For case-insensitive LIKE
+    // statements, we need to use ILIKE instead.
+    'LIKE' => array('operator' => 'ILIKE'),
+    'LIKE BINARY' => array('operator' => 'LIKE'),
+    'NOT LIKE' => array('operator' => 'NOT ILIKE'),
+    'REGEXP' => array('operator' => '~*'),
     );
     return isset($specials[$operator]) ? $specials[$operator] : NULL;
   }
@@ -352,7 +369,7 @@ public function getFullQualifiedTableName($table) {
    */
   public function addSavepoint($savepoint_name = 'mimic_implicit_commit') {
     if ($this->inTransaction()) {
-      $this->pushTransaction($savepoint_name);
+      $this->pushTransaction($savepoint_name, TransactionSettings::GetDefaults());
     }
   }
 
@@ -399,6 +416,72 @@ public function upsert($table, array $options = array()) {
     return new $class($this, $table, $options);
   }
 
+  /**
+   * {@inheritdoc}
+   */
+  public function getTransactionIsolationLevel() {
+    if ($this->inTransaction()) {
+      throw new DatabaseExceptionWrapper("Cannot retrieve the transaction isolation level inside a transaction.");
+    }
+    $ret = NULL;
+    $level_string = strtoupper(trim($this->connection->query("SELECT current_setting('transaction_isolation')")->fetchField()));
+    switch ($level_string) {
+      case 'SERIALIZABLE':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE;
+        break;
+      case 'REPEATABLE READ':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD;
+        break;
+      case 'READ COMMITTED':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED;
+        break;
+      case 'READ UNCOMMITTED':
+        $ret = TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED;
+        break;
+      default:
+        throw new \Exception("Isolation level not supported: $level_string");
+    }
+    return $ret;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function setTransactionIsolationLevel($level) {
+    $level_string = '';
+    switch ($level) {
+      case TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE:
+        $level_string = 'SERIALIZABLE';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD:
+        $level_string = 'REPEATABLE READ';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED:
+        $level_string = 'READ COMMITTED';
+        break;
+      case TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED:
+        $level_string = 'READ UNCOMMITTED';
+        break;
+      default:
+        throw new \InvalidArgumentException("Invalid transaction isolation level provided: $level");
+    }
+    if ($this->inTransaction()) {
+      // From the docs this approach only works once a transaction has started, and
+      // if called outside a transaction only affects the first statement.
+      $this->connection->query('SET TRANSACTION ISOLATION LEVEL ' . $level_string);
+    }
+    else {
+      $this->connection->query('SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL ' . $level_string);
+    }
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function setTransactionLockTimeout($timeout) {
+    $this->connection->query("SET lock_timeout = $timeout");
+  }
+
 }
 
 /**
diff --git a/core/lib/Drupal/Core/Database/Driver/sqlite/Connection.php b/core/lib/Drupal/Core/Database/Driver/sqlite/Connection.php
index b94ae9f..4f26b9e 100644
--- a/core/lib/Drupal/Core/Database/Driver/sqlite/Connection.php
+++ b/core/lib/Drupal/Core/Database/Driver/sqlite/Connection.php
@@ -10,6 +10,8 @@
 use Drupal\Core\Database\Database;
 use Drupal\Core\Database\DatabaseNotFoundException;
 use Drupal\Core\Database\Connection as DatabaseConnection;
+use Drupal\Core\Database\DatabaseExceptionWrapper;
+use Drupal\Core\Database\TransactionSettings;
 
 /**
  * SQLite implementation of \Drupal\Core\Database\Connection.
@@ -61,6 +63,9 @@ public function __construct(\PDO $connection, array $connection_options) {
     // This driver defaults to transaction support, except if explicitly passed FALSE.
     $this->transactionSupport = $this->transactionalDDLSupport = !isset($connection_options['transactions']) || $connection_options['transactions'] !== FALSE;
 
+    // SQLite does not support isolation levels.
+    $this->supportedIsolationLevels = array();
+
     $this->connectionOptions = $connection_options;
 
     // Attach one database for each registered prefix.
@@ -104,6 +109,7 @@ public static function open(array &$connection_options = array()) {
       // Convert numeric values to strings when fetching.
       \PDO::ATTR_STRINGIFY_FETCHES => TRUE,
     );
+
     $pdo = new \PDO('sqlite:' . $connection_options['database'], '', '', $connection_options['pdo']);
 
     // Create functions needed by SQLite.
@@ -367,10 +373,10 @@ public function createDatabase($database) {
   public function mapConditionOperator($operator) {
     // We don't want to override any of the defaults.
     static $specials = array(
-      'LIKE' => array('postfix' => " ESCAPE '\\'"),
-      'NOT LIKE' => array('postfix' => " ESCAPE '\\'"),
-      'LIKE BINARY' => array('postfix' => " ESCAPE '\\'", 'operator' => 'GLOB'),
-      'NOT LIKE BINARY' => array('postfix' => " ESCAPE '\\'", 'operator' => 'NOT GLOB'),
+    'LIKE' => array('postfix' => " ESCAPE '\\'"),
+    'NOT LIKE' => array('postfix' => " ESCAPE '\\'"),
+    'LIKE BINARY' => array('postfix' => " ESCAPE '\\'", 'operator' => 'GLOB'),
+    'NOT LIKE BINARY' => array('postfix' => " ESCAPE '\\'", 'operator' => 'NOT GLOB'),
     );
     return isset($specials[$operator]) ? $specials[$operator] : NULL;
   }
@@ -415,4 +421,53 @@ public function getFullQualifiedTableName($table) {
     return $prefix . $table;
   }
 
+  /**
+   * {@inheritdoc}
+   */
+  public function getTransactionIsolationLevel() {
+    // Just to make sure no one relies on getTransactionIsolationLevel()
+    // thinking they will get the isolation level of the active transaction.
+    if ($this->inTransaction()) {
+      throw new DatabaseExceptionWrapper("Sqlite does not support retrieving the transaction isolation level inside a transaction.");
+    }
+    // Transactions in SQLite are serializable.
+    $level = TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE;
+    // Except in this case...
+    $read_uncommited = $this->query("PRAGMA read_uncommitted")->fetchField();
+    if ($read_uncommited) {
+      $level = TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED;
+    }
+    return $level;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function setTransactionIsolationLevel($level) {
+    /* Except in the case of shared cache database connections with PRAGMA read_uncommitted
+     * turned on, all transactions in SQLite show "serializable" isolation. SQLite implements
+     * serializable transactions by actually serializing the writes. There can only
+     * be a single writer at a time to an SQLite database. There can be multiple database
+     * connections open at the same time, and all of those database connections can write to
+     * the database file, but they have to take turns. SQLite uses locks to serialization of
+     * the writes automatically; this is not something that the applications using SQLite
+     * need to worry with.
+     */
+
+    // Basically, Sqlite supports transactions, but not isolation levels. Make sure we validate
+    // that one of Drupal's isolation levels is passed.
+    if (!in_array($level, TransactionSettings::getAllIsolationLevels())) {
+      throw new \InvalidArgumentException("Invalid transaction isolation level provided: $level");
+    }
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function setTransactionLockTimeout($timeout) {
+    // SQLITE takes timeout in milliseconds
+    $timeout = $timeout * 1000;
+    $this->connection->query("PRAGMA busy_timeout = $timeout");
+  }
+
 }
diff --git a/core/lib/Drupal/Core/Database/Transaction.php b/core/lib/Drupal/Core/Database/Transaction.php
index eb4e182..6166760 100644
--- a/core/lib/Drupal/Core/Database/Transaction.php
+++ b/core/lib/Drupal/Core/Database/Transaction.php
@@ -43,6 +43,37 @@ class Transaction {
   protected $rolledBack = FALSE;
 
   /**
+   * A boolean value to indicate whether this transaction has been commited.
+   *
+   * @var bool
+   */
+  protected $commited = FALSE;
+
+  /**
+   * Transaction settings.
+   *
+   * @var TransactionSettings
+   */
+  protected $settings;
+
+  /**
+   * A connection might have been started for the only
+   * purpose of serving this transaction, so we need
+   * to restore it once we are over.
+   *
+   * @var string
+   */
+  protected $previous_connection_key;
+
+  /**
+   * If a connection was created to serve this transaction
+   * the key that was used
+   *
+   * @var string
+   */
+  protected $connection_key;
+
+  /**
    * The name of the transaction.
    *
    * This is used to label the transaction savepoint. It will be overridden to
@@ -50,8 +81,8 @@ class Transaction {
    */
   protected $name;
 
-  public function __construct(Connection $connection, $name = NULL) {
-    $this->connection = $connection;
+  public function __construct(Connection $connection, $name = NULL, TransactionSettings $settings) {
+    $this->settings = $settings;
     // If there is no transaction depth, then no transaction has started. Name
     // the transaction 'drupal_transaction'.
     if (!$depth = $connection->transactionDepth()) {
@@ -60,19 +91,59 @@ public function __construct(Connection $connection, $name = NULL) {
     // Within transactions, savepoints are used. Each savepoint requires a
     // name. So if no name is present we need to create one.
     elseif (!$name) {
-      $this->name = 'savepoint_' . $depth;
+      if ($settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS) {
+        $this->name = 'supress_' . $depth;
+      }
+      else {
+        $this->name = 'savepoint_' . $depth;
+      }
     }
     else {
       $this->name = $name;
     }
-    $this->connection->pushTransaction($this->name);
+
+    try {
+      $connection->pushTransaction($this->name, $settings);
+      $this->connection = $connection;
+    }
+    // This simply means that the current connection cannot accept this transaction
+    // and that it needs to be run against a new connection.
+    catch (TransactionCannotBePushedException $e) {
+      // Create a new active connection from
+      // the current active settings.
+      $this->connection_key = uniqid($this->name, TRUE);
+      $this->previous_connection_key = self::newActiveConnection($this->connection_key);
+      $this->connection = Database::getConnection();
+      // This transaction should be pushed to the new active connection, not
+      // the requested one.
+      $this->connection->pushTransaction($this->name, $settings, $this->previous_connection_key);
+    }
   }
 
   public function __destruct() {
-    // If we rolled back then the transaction would have already been popped.
-    if (!$this->rolledBack) {
-      $this->connection->popTransaction($this->name);
+    switch($this->settings->getScopeOption()) {
+      case TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS:
+        break;
+      default:
+        if (!$this->settings->getImplicitRollbacks()) {
+          // If we rolled back then the transaction would have already been popped.
+          if (!$this->rolledBack) {
+            $this->connection->popTransaction($this->name);
+          }
+        }
+        else {
+          // If we did not commit and did not rollback explicitly, rollback.
+          // Rollbacks are not usually called explicitly by the user
+          // but that could happen.
+          if (!$this->commited && !$this->rolledBack) {
+            $this->rolledBack = TRUE;
+            $this->connection->rollback($this->name);
+          }
+          break;
+        }
     }
+    // Restore the active connection if any.
+    $this->restoreActiveConnection();
   }
 
   /**
@@ -83,6 +154,32 @@ public function name() {
   }
 
   /**
+   * Commits the transaction. Only available for transactions with
+   * implicit rollbacks to respect Drupal's old implicit commit behaviour.
+   *
+   * @throws TransactionExplicitCommitNotAllowedException
+   */
+  public function commit() {
+    if ($this->settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS) {
+      throw new DatabaseExceptionWrapper("Cannot commit a transaction with a supress scope option. Unset the object to release the scope.");
+    }
+    // Insane transaction behaviour does not allow explicit commits.
+    if (!$this->settings->getImplicitRollbacks()) {
+      throw new TransactionExplicitCommitNotAllowedException();
+    }
+    // Cannot commit a rolledback transaction...
+    if ($this->rolledBack) {
+      throw new TransactionCannotCommitAfterRollbackException();
+    }
+    // Mark as commited, and commit.
+    $this->commited = TRUE;
+    // Pop it from the stack.
+    $this->connection->popTransaction($this->name);
+    // Restore the active connection if any.
+    $this->restoreActiveConnection();
+  }
+
+  /**
    * Rolls back the current transaction.
    *
    * This is just a wrapper method to rollback whatever transaction stack we are
@@ -95,7 +192,48 @@ public function name() {
    * @see watchdog_exception()
    */
   public function rollback() {
+    if ($this->settings->getScopeOption() == TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS) {
+      throw new DatabaseExceptionWrapper("Cannot rollback a transaction with a supress scope option. Unset the object to release the scope.");
+    }
     $this->rolledBack = TRUE;
     $this->connection->rollback($this->name);
+    // Restore the active connection if any.
+    $this->restoreActiveConnection();
+  }
+
+  /**
+   * Duplicates the current active connection with the new name
+   * and makes it the active one.
+   *
+   * @param string $key
+   *   The key for the new connection.
+   *
+   * @return string
+   *   The key of the previous active connection.
+   */
+  protected function newActiveConnection($key) {
+    // We need to clone the connection info and use that
+    // to spin up a new default/active connection until the transaction
+    // is over.
+    $previous_connection_key = Database::getActiveConnectionKey();
+    // The new connection key is the same name of the transaction,
+    // that is guaranteed to be unique.
+    $info = Database::getConnectionInfo($previous_connection_key);
+    $target = 'default';
+    Database::addConnectionInfo($key, $target, $info[$target]);
+    Database::setActiveConnection($key);
+    return $previous_connection_key;
+  }
+
+  /**
+   * Restores the previous active connection if
+   * one was created for this transaction.
+   */
+  protected function restoreActiveConnection() {
+    if (!empty($this->previous_connection_key)) {
+      Database::closeConnection(NULL, $this->connection_key);
+      Database::removeConnection($this->connection_key);
+      Database::setActiveConnection($this->previous_connection_key);
+    }
   }
 }
diff --git a/core/lib/Drupal/Core/Database/TransactionBlockedException.php b/core/lib/Drupal/Core/Database/TransactionBlockedException.php
new file mode 100644
index 0000000..fafd906
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/TransactionBlockedException.php
@@ -0,0 +1,14 @@
+<?php
+
+/**
+ * @file
+ * Contains \Drupal\Core\Database\TransactionBlocked.
+ */
+
+namespace Drupal\Core\Database;
+
+/**
+ * Exception thrown when trying to start a transaction in a connection
+ * that has transactions blocked.
+ */
+class TransactionBlockedException extends TransactionException implements DatabaseException { }
diff --git a/core/lib/Drupal/Core/Database/TransactionCannotBePushedException.php b/core/lib/Drupal/Core/Database/TransactionCannotBePushedException.php
new file mode 100644
index 0000000..0b8b6bd
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/TransactionCannotBePushedException.php
@@ -0,0 +1,14 @@
+<?php
+
+/**
+ * @file
+ * Contains \Drupal\Core\Database\TransactionCannotBePushedException.
+ */
+
+namespace Drupal\Core\Database;
+
+/**
+ * Exception thrown by a connect when the requested transaction cannot be pushed
+ * and requires a new connection.
+ */
+class TransactionCannotBePushedException extends TransactionException implements DatabaseException { }
diff --git a/core/lib/Drupal/Core/Database/TransactionCannotCommitAfterRollbackException.php b/core/lib/Drupal/Core/Database/TransactionCannotCommitAfterRollbackException.php
new file mode 100644
index 0000000..349f84d
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/TransactionCannotCommitAfterRollbackException.php
@@ -0,0 +1,17 @@
+<?php
+
+/**
+ * @file
+ * Contains \Drupal\Core\Database\TransactionCannotCommitAfterRollbackException.
+ */
+
+namespace Drupal\Core\Database;
+
+
+/**
+ * Exception to deny attempts to explicitly manage transactions.
+ *
+ * This exception will be thrown when the PDO connection commit() is called.
+ * Code should never call this method directly.
+ */
+class TransactionCannotCommitAfterRollbackException extends TransactionException implements DatabaseException { }
\ No newline at end of file
diff --git a/core/lib/Drupal/Core/Database/TransactionSettings.php b/core/lib/Drupal/Core/Database/TransactionSettings.php
new file mode 100644
index 0000000..1abf58d
--- /dev/null
+++ b/core/lib/Drupal/Core/Database/TransactionSettings.php
@@ -0,0 +1,219 @@
+<?php
+
+namespace Drupal\Core\Database;
+
+use Drupal\Core\Database\Database;
+
+/**
+ * Describes how a transaction should behave.
+ */
+class TransactionSettings {
+
+  #region Transaction Isolation Levels
+
+  /**
+   * Summary of TRANSACTION_ISOLATION_SERIALIZABLE
+   */
+  const TRANSACTION_ISOLATION_SERIALIZABLE = 'TRANSACTION_ISOLATION_SERIALIZABLE';
+
+  /**
+   * Summary of TRANSACTION_ISOLATION_REPEATABLEREAD
+   */
+  const TRANSACTION_ISOLATION_REPEATABLEREAD = 'TRANSACTION_ISOLATION_REPEATABLEREAD';
+
+  /**
+   * Summary of TRANSACTION_ISOLATION_READCOMMITTED
+   */
+  const TRANSACTION_ISOLATION_READCOMMITTED = 'TRANSACTION_ISOLATION_READCOMMITTED';
+
+  /**
+   * Summary of TRANSACTION_ISOLATION_READUNCOMMITTED
+   */
+  const TRANSACTION_ISOLATION_READUNCOMMITTED = 'TRANSACTION_ISOLATION_READUNCOMMITTED';
+
+  /**
+   * A different isolation level than the one specified is being used,
+   * but the level cannot be determined. An exception is thrown if this value is set.
+   */
+  const TRANSACTION_ISOLATION_NONE = 'TRANSACTION_ISOLATION_NONE';
+
+  /**
+   * Ignore transaction isolation levels, for drivers that do not support
+   * them.
+   */
+  const TRANSACTION_ISOLATION_IGNORE = 'TRANSACTION_ISOLATION_IGNORE';
+
+  #endregion
+
+  #region Transaction nesting behaviour
+
+  /**
+   * Drupal legacy support. Starts a new transaction if no ambient transaction exists
+   * or uses savepoints if a new transaction exists.
+   */
+  const TRANSACTION_SCOPEOPTION_LEGACY = 'TRANSACTION_SCOPEOPTION_LEGACY';
+
+  /**
+   * Start only a new transaction if there is no active transaction.
+   */
+  const TRANSACTION_SCOPEOPTION_REQUIRED = 'TRANSACTION_SCOPEOPTION_REQUIRED';
+
+  /**
+   * Runs in the ambien transaction mode with no active transaction.
+   */
+  const TRANSACTION_SCOPEOPTION_SUPRESS = 'TRANSACTION_SCOPEOPTION_SUPRESS';
+
+  /**
+   * Forces the start of a new transaction. If an active transaction exists, a new
+   * connection is created and set as active for the duration of this scope.
+   */
+  const TRANSACTION_SCOPEOPTION_REQUIRESNEW = 'TRANSACTION_SCOPEOPTION_REQUIRESNEW';
+
+  #endregion
+
+  /**
+   * Requested isolation level.
+   *
+   * @var string
+   */
+  private $isolationLevel;
+
+  /**
+   * Requested scope option.
+   *
+   * @var string
+   */
+  private $scopeOption;
+
+  /**
+   * Implicit rollbacks.
+   *
+   * @var bool
+   */
+  private $implicitRollbacks;
+
+  /**
+   * Get a TransactionSettings instance. Defaults to Drupal's
+   * historical default behaviour. Those were:
+   *
+   * @param Bool $sane
+   *   Wether to use implicit commits or implicit rollbacks. Defaults to implicit commits.
+   *
+   * @param String $ScopeOption
+   *   How nested transactions whould behave. Defaults to TRANSACTION_SCOPEOPTION_LEGACY
+   *   meaning that savepoints are managed to deal with nested transactions.
+   *
+   * @param String $IsolationLevel
+   *   Transaction isolation level. Defaults to IGNORE meaning
+   *   that the ambient transaction mode will be used for transactions.
+   */
+  public function __construct($sane = FALSE, $scopeoption = self::TRANSACTION_SCOPEOPTION_LEGACY, $isolationlevel = self::TRANSACTION_ISOLATION_IGNORE) {
+
+    // We validate scope options - because they are a specific Drupal implementacion - but not isolation levels because drivers could provide their own extra
+    // isolation levels.
+    if (!in_array($scopeoption, self::getAllScopeOptions())) {
+      throw new \InvalidArgumentException("Unsupported scope option: $scopeoption");
+    }
+
+    // The SUPRESS scope option can only be used with the IGNORE transaction isolation mode
+    if ($scopeoption == self::TRANSACTION_SCOPEOPTION_SUPRESS && $isolationlevel != self::TRANSACTION_ISOLATION_IGNORE) {
+      throw new \InvalidArgumentException("Supress scope option can only be used with the IGNORE transaction isolation mode.");
+    }
+
+    // The LEGACY scope option can only be used with the IGNORE transaction isolation level, otherwise we will need to verify
+    // if the requested nested isolation levels are compatible because the LEGACY scope option uses savepoints to nest transactions
+    // and you CANNOT change the isolation level in between savepoints. Technicaly we could allow to use something different
+    // to ignore as long as we ensure that in nested transactions the isolation level is compatible.
+    if ($scopeoption == self::TRANSACTION_SCOPEOPTION_LEGACY && $isolationlevel != self::TRANSACTION_ISOLATION_IGNORE) {
+      throw new DatabaseExceptionWrapper("Legacy scope option can only be used with the IGNORE transaction isolation mode.");
+    }
+
+    $this->implicitRollbacks = $sane;
+    $this->isolationLevel = $isolationlevel;
+    $this->scopeOption = $scopeoption;
+  }
+
+  /**
+   * Get the isolation level for this transaction.
+   *
+   * @return string
+   */
+  public function getIsolationLevel() {
+    return $this->isolationLevel;
+  }
+
+  /**
+   * Get the current ScopeOption.
+   *
+   * @return string
+   */
+  public function getScopeOption() {
+    return $this->scopeOption;
+  }
+
+  /**
+   * Return commit behaviour for this transaction.
+   *
+   * FALSE: this is Drupal's default, meaning that by default if you do
+   * not rollback a exception, it will be commited.
+   *
+   * TRUE: you MUST call commit() on a transaction explictly, otherwise
+   * it will be automatically rolled back.
+   *
+   * @return bool
+   */
+  public function getImplicitRollbacks() {
+    return $this->implicitRollbacks;
+  }
+
+  /**
+   * Returns a default setting system-wide to make it compatible
+   * with Drupal's defaults. Cannot use snapshot isolation because
+   * it is not compatible with DDL operations and Drupal historically
+   * has had no knowledge of transaction isolation.
+   *
+   * @return TransactionSettings
+   */
+  public static function getDefaults() {
+    return new TransactionSettings();
+  }
+
+  /**
+   * Returns a TransactionSettings settings instance
+   * with implicit rolllbacks, required transaction
+   * scope option and read commited isolation level.
+   *
+   * @return TransactionSettings
+   */
+  public static function getBetterDefaults() {
+    return new TransactionSettings(TRUE, self::TRANSACTION_SCOPEOPTION_REQUIRED, self::TRANSACTION_ISOLATION_READCOMMITTED);
+  }
+
+  /**
+   * Return a list of all available transaction isolation levels.
+   *
+   * @return string[]
+   */
+  public static function getAllIsolationLevels() {
+    return [
+      self::TRANSACTION_ISOLATION_READCOMMITTED,
+      self::TRANSACTION_ISOLATION_READUNCOMMITTED,
+      self::TRANSACTION_ISOLATION_REPEATABLEREAD,
+      self::TRANSACTION_ISOLATION_SERIALIZABLE
+      ];
+  }
+
+  /**
+   * Return a list of all available scope options.
+   *
+   * @return string[]
+   */
+  public static function getAllScopeOptions() {
+    return [
+      self::TRANSACTION_SCOPEOPTION_LEGACY,
+      self::TRANSACTION_SCOPEOPTION_REQUIRED,
+      self::TRANSACTION_SCOPEOPTION_REQUIRESNEW,
+      self::TRANSACTION_SCOPEOPTION_SUPRESS
+      ];
+  }
+}
diff --git a/core/tests/Drupal/KernelTests/Core/Database/TransactionTest.php b/core/tests/Drupal/KernelTests/Core/Database/TransactionTest.php
new file mode 100644
index 0000000..a0df11d
--- /dev/null
+++ b/core/tests/Drupal/KernelTests/Core/Database/TransactionTest.php
@@ -0,0 +1,622 @@
+<?php
+
+/**
+ * @file
+ * Contains \Drupal\KernelTests\Core\Database\TransactionTest.
+ */
+
+namespace Drupal\KernelTests\Core\Database;
+
+use Drupal\Core\Database\Database;
+use Drupal\Core\Database\Connection;
+
+use Drupal\Core\Database\TransactionSettings;
+use Drupal\Core\Database\TransactionExplicitCommitNotAllowedException;
+use Drupal\Core\Database\TransactionCannotCommitAfterRollbackException;
+use Drupal\Core\Database\TransactionCommitFailedException;
+use Drupal\Core\Database\TransactionBlockedException;
+use Drupal\Core\Database\TransactionCannotBePushedException;
+
+use Drupal\KernelTests\KernelTestBase;
+
+/**
+ * Test transactions.
+ *
+ * @group Database
+ */
+class TransactionTest extends KernelTestBase {
+
+  /**
+   * @var Connection
+   *   The database connection.
+   */
+  protected $connectionA;
+
+  /**
+   * @var Connection
+   *   The database connection.
+   */
+  protected $connectionB;
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function setUp() {
+    parent::setUp();
+
+    // To properly test transactions we need
+    // two different connections...
+    $this->connectionA = Database::getConnection();
+    $this->connectionB = Database::getConnection('default', NULL, 'copy');
+
+    // All these tests require the driver to support transactions.
+    if (!Database::getConnection()->supportsTransactions()) {
+      $this->markTestSkipped('Transactions are not supported on this database driver.');
+    }
+
+    // Insert some test data.
+    $table_spec = array(
+       'fields' => array(
+         'id'  => array(
+           'type' => 'serial',
+           'not null' => TRUE,
+         ),
+         'task' => array(
+           'type' => 'varchar',
+           'length' => 255,
+           'not null' => TRUE,
+         ),
+       ),
+       'primary key' => array('id'),
+     );
+
+    db_create_table('test_task', $table_spec);
+    db_insert('test_task')->fields(array('task' => 'eat'))->execute();
+    db_insert('test_task')->fields(array('task' => 'sleep'))->execute();
+    db_insert('test_task')->fields(array('task' => 'sleep'))->execute();
+
+  }
+
+  /**
+   * Delete a task.
+   *
+   * @param string $name
+   * @param Connection $connection
+   */
+  protected function TaskDelete($name, Connection $connection = NULL) {
+    if (empty($connection)) {
+      $connection = $this->connectionA;
+    }
+    if (!$this->TaskExists($name, $connection)) {
+      throw new \Exception("Cannot delete a task that does not exist.");
+    }
+    $query = $connection->delete('test_task');
+    $query->condition('task', $name);
+    $query->execute();
+  }
+
+  /**
+   * Add a task.
+   *
+   * @param string $name
+   * @param Connection $connection
+   */
+  protected function TaskAdd($name, Connection $connection = NULL) {
+    if (empty($connection)) {
+      $connection = $this->connectionA;
+    }
+    $query = $connection->insert('test_task');
+    $query->fields(array('task' => $name));
+    $query->execute();
+  }
+
+  /**
+   * Check if a task exists.
+   *
+   * @param string $name
+   *   The name of the task.
+   * @param Connection $connection
+   *   The connection to use, if nothing specific uses ConnectionA.
+   */
+  protected function TaskExists($name, Connection $connection = NULL) {
+    if (empty($connection)) {
+      $connection = $this->connectionA;
+    }
+    $query = $connection->select('test_task');
+    $query->addField('test_task', 'id');
+    $query->condition('task', $name);
+    $result = $query->execute();
+
+    return count($result->fetchAll()) > 0;
+  }
+
+  /**
+   *  Test implicit commits.
+   */
+  public function testImplicitCommit() {
+
+    $name = 'my test task';
+
+    // Test that after a forced commit data is stored.
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction();
+    $this->TaskAdd($name);
+    unset($transaction);
+    $this->assertTrue($this->TaskExists($name));
+    $this->TaskDelete($name);
+    $this->assertFalse($this->TaskExists($name));
+
+    // Test that we can rollback and data vanishes.
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction();
+    $this->TaskAdd($name);
+    $transaction->rollback();
+    $this->assertFalse($this->TaskExists($name));
+
+    // Trying to call commit throws an exception (actually commit
+    // was not even implemented before)
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction();
+    $this->TaskAdd($name);
+    try {
+      $transaction->commit();
+      $this->fail("Calling commit with implicit commits should throw an exception.");
+    }
+    catch (TransactionExplicitCommitNotAllowedException $e) {
+      $transaction->rollback();
+    }
+    $this->assertFalse($this->TaskExists($name));
+  }
+
+  /**
+   *  Test implicit rollback.
+   */
+  public function testImplicitRollback() {
+
+    $name = 'my test task';
+
+    $settings = new TransactionSettings(TRUE);
+
+    // Test that after a forced commit data is stored.
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $transaction->commit();
+    $this->assertTrue($this->TaskExists($name));
+    $this->TaskDelete($name);
+    $this->assertFalse($this->TaskExists($name));
+
+    // Test that if commit is not called data vanishes.
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    unset($transaction);
+    $this->assertFalse($this->TaskExists($name));
+
+    // Test that we can rollback and data vanishes.
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $transaction->rollback();
+    $this->assertFalse($this->TaskExists($name));
+
+    // Test that calling commit after rollback throws an exception
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $transaction->rollback();
+    $this->assertFalse($this->TaskExists($name));
+    try {
+      $transaction->commit();
+      $this->fail("Cannot commit after rollback");
+    }
+    catch (TransactionCannotCommitAfterRollbackException $e) {}
+
+    // Test that calling rollback after commit throws an exception
+    unset($transaction);
+    $transaction = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $transaction->commit();
+    $this->assertTrue($this->TaskExists($name));
+    try {
+      $transaction->rollback();
+      $this->fail("Cannot rollback after commit");
+    }
+    catch (\Exception $e) {}
+    $this->TaskDelete($name);
+    $this->assertFalse($this->TaskExists($name));
+  }
+
+  /**
+   * Test ReadCommited transaction mode.
+   */
+  public function testReadCommited() {
+
+    if (!in_array(TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED, $this->connectionA->supportedTransactionIsolationLevels())) {
+      $this->markTestSkipped('Transaction isolation level not supported.');
+    }
+
+    /**
+     * ReadCommited
+     *
+     * Specifies that statements cannot read data that has been modified but not
+     * committed by other transactions. This prevents dirty reads. Data can be
+     * changed by other transactions between individual statements within the
+     * current transaction, resulting in nonrepeatable reads or phantom data.
+     *
+     * This option is the SQL Server default.
+     * */
+
+    $name = 'my test task';
+
+    $settings = new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED, TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED);
+
+    $transactionA = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $this->assertTrue($this->TaskExists($name));
+
+    $transactionB = $this->connectionB->startTransaction('', $settings);
+    $this->assertFalse($this->TaskExists($name, $this->connectionB));
+
+    // After commiting the task should exist for the other transaction
+    $transactionA->commit();
+    $this->assertTrue($this->TaskExists($name));
+    $this->assertTrue($this->TaskExists($name, $this->connectionB));
+
+    $transactionB->rollback();
+    $this->assertTrue($this->TaskExists($name));
+    $this->assertTrue($this->TaskExists($name, $this->connectionB));
+
+  }
+
+  /**
+   * Test ReadUncommited transaction mode.
+   */
+  public function testReadUncommited() {
+
+    if (!in_array(TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED, $this->connectionA->supportedTransactionIsolationLevels())) {
+      $this->markTestSkipped('Transaction isolation level not supported.');
+    }
+
+    /**
+     * ReadUncommited
+     *
+     * Specifies that statements can read rows that have been modified by other transactions
+     * but not yet committed. Transactions running at the READ UNCOMMITTED level do not
+     * issue shared locks to prevent other transactions from modifying data read by the
+     * current transaction. READ UNCOMMITTED transactions are also not blocked by
+     * exclusive locks that would prevent the current transaction from reading rows
+     * that have been modified but not committed by other transactions. When this option
+     * is set, it is possible to read uncommitted modifications, which are called dirty
+     * reads. Values in the data can be changed and rows can appear or disappear in the
+     * data set before the end of the transaction. This option has the same effect as
+     * setting NOLOCK on all tables in all SELECT statements in a transaction.
+     *
+     * This is the least restrictive of the isolation levels.
+     *
+     * */
+
+    $name = 'my test task';
+
+    $settings = new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED, TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED);
+
+    $transactionA = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $this->assertTrue($this->TaskExists($name));
+
+    // Read uncommited data from another transaction
+    $transactionB = $this->connectionB->startTransaction('', $settings);
+    $this->assertTrue($this->TaskExists($name, $this->connectionB));
+
+    $transactionA->rollback();
+    $transactionB->rollback();
+
+    $this->assertFalse($this->TaskExists($name));
+    $this->assertFalse($this->TaskExists($name, $this->connectionB));
+
+  }
+
+  /**
+   * Test Serializable transaction mode.
+   */
+  public function testSerializable() {
+
+    if (!in_array(TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE, $this->connectionA->supportedTransactionIsolationLevels())) {
+      $this->markTestSkipped('Transaction isolation level not supported.');
+    }
+
+    if (Database::getConnection()->driver() == 'pgsql') {
+      $this->markTestSkipped('Postgre implementation of SERIALIZABLE is quite particular.');
+    }
+
+    /**
+     * ReadUncommited
+     *
+     * Specifies that statements can read rows that have been modified by other transactions
+     * but not yet committed. Transactions running at the READ UNCOMMITTED level do not
+     * issue shared locks to prevent other transactions from modifying data read by the
+     * current transaction. READ UNCOMMITTED transactions are also not blocked by
+     * exclusive locks that would prevent the current transaction from reading rows
+     * that have been modified but not committed by other transactions. When this option
+     * is set, it is possible to read uncommitted modifications, which are called dirty
+     * reads. Values in the data can be changed and rows can appear or disappear in the
+     * data set before the end of the transaction. This option has the same effect as
+     * setting NOLOCK on all tables in all SELECT statements in a transaction.
+     *
+     * This is the least restrictive of the isolation levels.
+     *
+     * */
+
+    $name = 'my test task';
+
+    $settings = new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED, TransactionSettings::TRANSACTION_ISOLATION_SERIALIZABLE);
+
+    $transactionA = $this->connectionA->startTransaction('', $settings);
+    $this->TaskAdd($name);
+    $this->assertTrue($this->TaskExists($name));
+
+    $transactionB = $this->connectionB->startTransaction('', $settings);
+
+    // This data is being LOCKED by the other transaction, so we exect this to fail
+    // after lock time expires
+    $timeout = 5;
+    $this->connectionB->setTransactionLockTimeout($timeout);
+    $start = microtime(TRUE);
+    try {
+      $this->assertFalse($this->TaskExists($name, $this->connectionB));
+      $this->fail("Cannot read a locked row.");
+    }
+    catch (\Exception $e) {
+      $lapsed = microtime(TRUE) - $start;
+      $time_ok = ($lapsed >= $timeout) && ($lapsed < $timeout * 1.5 + 2);
+      $this->assertEqual($time_ok, TRUE, "Transaction lock time honored.");
+    }
+
+    // After commiting the task should exist for the other transaction
+    $transactionA->commit();
+    $this->assertTrue($this->TaskExists($name));
+    $this->assertTrue($this->TaskExists($name, $this->connectionB));
+
+    $transactionB->rollback();
+    $this->assertTrue($this->TaskExists($name));
+    $this->assertTrue($this->TaskExists($name, $this->connectionB));
+
+  }
+
+  /**
+   * Test RepeatableRead transaction mode.
+   */
+  public function testRepeatableRead() {
+
+    if (!in_array(TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD, $this->connectionA->supportedTransactionIsolationLevels())) {
+      $this->markTestSkipped('Transaction isolation level not supported.');
+    }
+
+    $name = 'my test task';
+
+    /**
+     * Repeatable Read.
+     *
+     * Specifies that statements cannot read data that has been modified but not yet committed by
+     * other transactions and that no other transactions can modify data that has been
+     * read by the current transaction until the current transaction completes. Shared
+     * locks are placed on all data read by each statement in the transaction and are held
+     * until the transaction completes. This prevents other transactions from modifying any
+     * rows that have been read by the current transaction. Other transactions can insert
+     * new rows that match the search conditions of statements issued by the current
+     * transaction. If the current transaction then retries the statement it will retrieve
+     * the new rows, which results in phantom reads. Because shared locks are held to the
+     * end of a transaction instead of being released at the end of each statement,
+     * concurrency is lower than the default READ COMMITTED isolation level.
+     *
+     * Use this option only when necessary.
+     *
+     * */
+
+    $settings = new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED, TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD);
+
+    $transaction = $this->connectionA->startTransaction('', $settings);
+
+    $this->assertEqual($this->TaskExists($name, $this->connectionA), FALSE, 'Task does not exist.');
+
+    // Create a task from another connection, no need for a transaction here.
+    $this->TaskAdd($name, $this->connectionB);
+
+    // Task should still NOT exist because we are working on a snapshot.
+    $this->assertEqual($this->TaskExists($name, $this->connectionA), FALSE, 'Task does not exist.');
+
+    $transaction->commit();
+
+    $this->assertEqual($this->TaskExists($name, $this->connectionA), TRUE, 'Task exists.');
+
+  }
+
+  /**
+   * Test nested transactions when using scope options.
+   */
+  public function testScopeOptions() {
+
+    // This test will only work when the ambient transaction mode does NOT allow dirty reads.
+    $ambient = $this->connectionA->getAmbientTransactionIsolationLevel();
+    if (!in_array($ambient, array(TransactionSettings::TRANSACTION_ISOLATION_REPEATABLEREAD, TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED))) {
+      $this->markTestSkipped('Supress scope option test will not work with current ambient/default isolation level.');
+    }
+
+    // Cannot use supress with anything different than TRANSACTION_ISOLATION_IGNORE
+    try {
+      new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS, TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED);
+      $this->fail("Transaction with a supress scope option cannot have an isolation level other than TRANSACTION_ISOLATION_IGNORE.");
+    }
+    catch (\Exception $e) { }
+
+    // Keep track of the original number of connection info's
+    $info_count = count(Database::getAllConnectionInfo());
+
+    $name = 'my test task';
+
+    $connectionId = Database::getConnection()->getId();
+
+    // Start a transaction
+    $transaction = Database::getConnection()->startTransaction('', new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRESNEW, TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED));
+    $this->TaskAdd($name, Database::getConnection());
+
+    $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 0, "Number of active connections is right.");
+
+    // Make sure the active connection has not changed
+    $this->assertEqual(Database::getConnection()->getId(), $connectionId, "Active connection has not changed.");
+
+    // From now on ALL code is run against a connection that runs on the ambient transaction mode, until $transactionB is destroyed.
+    $transactionB = Database::getConnection()->startTransaction('', new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_SUPRESS, TransactionSettings::TRANSACTION_ISOLATION_IGNORE));
+    $connectionIdB = Database::getConnection()->getId();
+
+    $this->assertEqual(Database::getConnection()->getTransactionIsolationLevel(), $ambient, "After supress we are back to the ambient isolation level.");
+    $this->assertEqual(Database::getConnection()->transactionDepth(), 0, "The active connection has no transaction depth after staring a supressed transaction scope.");
+    $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 1, "Number of active connections is right.");
+
+    // Make sure the active connection has changed.
+    $this->assertNotEqual($connectionId, $connectionIdB, "Active connection has changed.");
+
+    // Transactions with supress cannot be commited.
+    try {
+      Database::getConnection()->commit();
+      $this->fail("Transaction with a supress scope option cannot be commited.");
+    }
+    catch (\Exception $e) { }
+
+    // Transactions with supress cannot be rolled back.
+    try {
+      Database::getConnection()->rollback();
+      $this->fail("Transaction with a supress scope option cannot be rolled back.");
+    }
+    catch (\Exception $e) { }
+
+    // Because the ambient transaction is does not allow dirty reads, we should not be seeing the
+    // changes made inside the first transaction
+    // until $transactionB is cleared
+    $this->assertFalse($this->TaskExists($name, Database::getConnection()));
+
+    // This part of the test needs READUNCOMMITED capabilities
+    // postgre does not have such a thing.
+    if (in_array(TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED, Database::getConnection()->supportedTransactionIsolationLevels())) {
+
+      // Nest another transaction
+      $transactionC = Database::getConnection()->startTransaction('', new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRESNEW, TransactionSettings::TRANSACTION_ISOLATION_READUNCOMMITTED));
+      $connectionIdC = Database::getConnection()->getId();
+
+      $this->assertEqual($connectionIdC, $connectionIdB, "Active connection has not changed");
+      $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 1, "Number of active connections is right.");
+
+      // Because this new transaction can read dirty data, the task should be visible.
+      $this->assertTrue($this->TaskExists($name, Database::getConnection()));
+
+      // One more nested transaction
+      $transactionD = Database::getConnection()->startTransaction('', new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRESNEW, TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED));
+      $connectionIdD = Database::getConnection()->getId();
+
+      $this->assertNotEqual($connectionIdD, $connectionIdB, "Active connection has changed.");
+      $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 2, "Number of active connections is right.");
+
+      // Readcommited should not be able to read dirty data.
+      $this->assertFalse($this->TaskExists($name, Database::getConnection()));
+
+      unset($transactionD);
+
+      // Make sure the active connection is right.
+      $this->assertEqual(Database::getConnection()->getId(), $connectionIdC, "Active connection has has been properly restored.");
+      $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 1, "Number of active connections is right.");
+
+      unset($transactionC);
+
+    }
+
+    // Make sure the active connection is right.
+    $this->assertEqual(Database::getConnection()->getId(), $connectionIdB, "Active connection has has been properly restored.");
+    $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 1, "Number of active connections is right.");
+
+    // Unsetting this transaction scope should not fail, even though it was
+    // part of the transaction stack.
+    unset($transactionB);
+
+    // Make sure the active connection is right.
+    $this->assertEqual(Database::getConnection()->getId(), $connectionId, "Active connection has has been properly restored.");
+    $this->assertEqual(count(Database::getAllConnectionInfo()) - $info_count, 0, "Number of active connections is right.");
+
+    $transaction->commit();
+
+    // Now it should work.
+    $this->assertTrue($this->TaskExists($name));
+
+  }
+
+  /**
+   * Test that a nested required transaction is properly destroyed.
+   */
+  public function testNestedRequiredTransactionIsProperlyDestroyed() {
+    $settings = new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED, TransactionSettings::TRANSACTION_ISOLATION_READCOMMITTED);
+
+    $transactionA = Database::getConnection()->startTransaction('', $settings);
+
+    $transactionB = Database::getConnection()->startTransaction('', $settings);
+    $transactionB->commit();
+    unset($transactionB);
+
+    $transactionC = Database::getConnection()->startTransaction('', $settings);
+    $transactionC->rollback();
+    unset($transactionC);
+
+    $transactionA->commit();
+  }
+
+  /**
+   * Test that an invalid transaction isolation or scope option throws an exception.
+   */
+  public function testInvalidTransactionSettings() {
+
+    try {
+      $this->connectionA->startTransaction('', new TransactionSettings(TRUE, 'sdg', TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRED));
+      $this->fail("Should not be able to start a transaction with an unsupported scope option.");
+    }
+    catch (\InvalidArgumentException $e) {}
+
+    try {
+      $this->connectionA->startTransaction('', new TransactionSettings(TRUE, TransactionSettings::TRANSACTION_SCOPEOPTION_REQUIRESNEW, 'asdgsdg'));
+      $this->fail("Should not be able to start a transaction with an unsupported isolation level.");
+    }
+    catch (\InvalidArgumentException $e) {}
+
+    // We are storing this in an unused variable because if we don't then the transaction is inmediately
+    // destroyed.
+    $transaction = $this->connectionA->startTransaction('', TransactionSettings::getBetterDefaults());
+    try {
+      // Trying to retrieve the active transaction level while inside a transaction should always throw exception.
+      $this->connectionA->getTransactionIsolationLevel();
+      $this->fail("Retrieving the transaction isolation level while within a transaction is not allowed.");
+    }
+    catch (\Exception $e) {}
+  }
+
+  /**
+   * Test the connection transaction blocking functionality.
+   */
+  public function testConnectionTransactionBlock() {
+    // Ensure that we can start and commit a transaction.
+    $transaction = $this->connectionA->startTransaction('', TransactionSettings::getBetterDefaults());
+    $this->TaskAdd('test name');
+    $transaction->commit();
+    // Ensure that we cannot change the block transaction settings when a transaction has started.
+    $transaction = $this->connectionA->startTransaction('', TransactionSettings::getBetterDefaults());
+    try {
+      $this->connectionA->setBlockTransactions(TRUE);
+      $this->fail("Changing the block transaction setting during a transaction should not be possible.");
+    }
+    catch (\Exception $e) {}
+    $transaction->commit();
+    // Ensure that we cannot start a transaction.
+    $this->connectionA->setBlockTransactions(TRUE);
+    try {
+      $this->connectionA->startTransaction('', TransactionSettings::getBetterDefaults());
+      $this->fail("Should not be able to start a transaction when these are blocked.");
+    }
+    catch (TransactionBlockedException $e) {}
+  }
+}
