diff --git a/README.md b/README.md
new file mode 100644
index 0000000..9eb336c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,4 @@
+# Big todos
+
+* Should we reuse the core queue API as much as possible or design a new API from scratch
+* Based upon the previous question we could use the QueueJob value object in more places
diff --git a/advancedqueue.drush.inc b/advancedqueue.drush.inc
new file mode 100644
index 0000000..17cb983
--- /dev/null
+++ b/advancedqueue.drush.inc
@@ -0,0 +1,29 @@
+<?php
+
+/**
+ * Implements hook_drush_command().
+ */
+function advancedqueue_drush_command() {
+  $items = [];
+
+  $items['advancedqueue_run'] = [
+    'description' => "Runs a single queue",
+    'arguments' => [
+      'queue_name' => 'Runs a single queue',
+    ],
+    'options' => [
+    ],
+    'examples' => [
+      'drush advancedqueue_run my_queue_name' => 'Processes the my_queue_name queue',
+    ],
+    // 'aliases' => ['mmas'],
+  ];
+
+  return $items;
+}
+
+function drush_advancedqueue_run($queue_name) {
+  /** @var \Drupal\advancedqueue\Runner\DrushQueueRunner $drush_runner */
+  $drush_runner = \Drupal::service('advancedqueue.drush_runner');
+  $drush_runner->run($queue_name);
+}
diff --git a/advancedqueue.module b/advancedqueue.module
deleted file mode 100644
index 5427433..0000000
--- a/advancedqueue.module
+++ /dev/null
@@ -1,21 +0,0 @@
-<?php
-
-/**
- * @file
- * advancedqueue.module
- */
-
-/**
- * Implements hook_menu().
- */
-function advancedqueue_menu() {
-  $items = [];
-  return $items;
-}
-
-/**
- * Implements hook_form_alter().
- */
-function advancedqueue_form_alter() {
-
-}
diff --git a/advancedqueue.services.yml b/advancedqueue.services.yml
index 2d6f553..26b6a8b 100644
--- a/advancedqueue.services.yml
+++ b/advancedqueue.services.yml
@@ -1,4 +1,26 @@
 services:
+  advancedqueue.factory:
+    class: \Drupal\advancedqueue\Queue\AdvancedQueueFactory
+    arguments: ['@settings']
+    calls:
+      - [setContainer, ['@service_container']]
+
   advancedqueue.database:
-    class: Drupal\advancedqueue\Queue\AdvancedQueueDatabaseFactory
-    arguments: ['@database']
+    class: \Drupal\advancedqueue\Queue\AdvancedQueueDatabaseFactory
+    arguments: ['@database', '@datetime.time']
+
+  plugin.manager.advancedqueue_worker:
+    class: \Drupal\advancedqueue\AdvancedqueueWorkerManager
+    parent: default_plugin_manager
+
+  advancedqueue.serial_runner:
+    class: \Drupal\advancedqueue\Runner\SerialQueueRunner
+    arguments: ['@advancedqueue.factory', '@plugin.manager.advancedqueue_worker', '@datetime.time']
+
+  advancedqueue.drush_runner:
+    class: \Drupal\advancedqueue\Runner\DrushQueueRunner
+    arguments: ['@advancedqueue.factory', '@plugin.manager.advancedqueue_worker', '@datetime.time']
+
+  advancedqueue.console_runner:
+    class: \Drupal\advancedqueue\Runner\ConsoleQueueRunner
+    arguments: ['@advancedqueue.factory', '@plugin.manager.advancedqueue_worker', '@datetime.time']
diff --git a/console.services.yml b/console.services.yml
new file mode 100644
index 0000000..1308154
--- /dev/null
+++ b/console.services.yml
@@ -0,0 +1,7 @@
+services:
+  advancedqueue.advancedqueue_run-queue:
+    class: Drupal\advancedqueue\Command\QueueRunCommand
+    arguments: []
+    tags:
+      - { name: drupal.command }
+
diff --git a/console/translations/en/advancedqueue.run-queue.yml b/console/translations/en/advancedqueue.run-queue.yml
new file mode 100644
index 0000000..37d0302
--- /dev/null
+++ b/console/translations/en/advancedqueue.run-queue.yml
@@ -0,0 +1,6 @@
+description: 'Run a single advancedqueue by name'
+options: {}
+arguments: {}
+messages:
+    queue_job_processsing_start: 'Processing item @id from @name queue.'
+    queue_job_exception: 'Error while processing @id from @name queue: @message'
diff --git a/src/AdvancedQueueServiceProvider.php b/src/AdvancedQueueServiceProvider.php
index 3f4c4d2..cb90cbf 100644
--- a/src/AdvancedQueueServiceProvider.php
+++ b/src/AdvancedQueueServiceProvider.php
@@ -15,8 +15,9 @@ class AdvancedQueueServiceProvider extends ServiceProviderBase {
    */
   public function alter(ContainerBuilder $container) {
     // Overrides language_manager class to test domain language negotiation.
-    $queue_service_definition = $container->getDefinition('queue');
-    $queue_service_definition->setClass('Drupal\advancedqueue\AdvancedQueueFactory');
+    // @todo Decide whether we want to continue to do that.
+//    $queue_service_definition = $container->getDefinition('queue');
+//    $queue_service_definition->setClass('Drupal\advancedqueue\AdvancedQueueFactory');
   }
 }
 
diff --git a/src/AdvancedQueueWorkerBase.php b/src/AdvancedQueueWorkerBase.php
new file mode 100644
index 0000000..ecda2d9
--- /dev/null
+++ b/src/AdvancedQueueWorkerBase.php
@@ -0,0 +1,17 @@
+<?php
+
+namespace Drupal\advancedqueue;
+
+use Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface;
+use Drupal\Core\Plugin\PluginBase;
+
+abstract class AdvancedQueueWorkerBase extends PluginBase implements AdvancedQueueWorkerInterface {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getGroups() {
+    return [];
+  }
+
+}
diff --git a/src/AdvancedqueueWorkerManager.php b/src/AdvancedqueueWorkerManager.php
new file mode 100644
index 0000000..5feec33
--- /dev/null
+++ b/src/AdvancedqueueWorkerManager.php
@@ -0,0 +1,24 @@
+<?php
+
+namespace Drupal\advancedqueue;
+
+use Drupal\Core\Cache\CacheBackendInterface;
+use Drupal\Core\Extension\ModuleHandlerInterface;
+use Drupal\Core\Plugin\DefaultPluginManager;
+
+class AdvancedqueueWorkerManager extends DefaultPluginManager {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function __construct(\Traversable $namespaces, CacheBackendInterface $cache_backend, ModuleHandlerInterface $module_handler) {
+    $this->alterInfo('advancedqueue_worker');
+    $this->setCacheBackend($cache_backend, 'advancedqueue_worker');
+
+    parent::__construct('Plugin/advancedqueue/Worker', $namespaces, $module_handler,
+      'Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface',
+      '\Drupal\advancedqueue\Annotation\AdvancedQueueWorker'
+    );
+  }
+
+}
diff --git a/src/Annotation/AdvancedQueueWorker.php b/src/Annotation/AdvancedQueueWorker.php
new file mode 100644
index 0000000..ad42a4b
--- /dev/null
+++ b/src/Annotation/AdvancedQueueWorker.php
@@ -0,0 +1,16 @@
+<?php
+
+namespace Drupal\advancedqueue\Annotation;
+
+use Drupal\Component\Annotation\PluginID;
+
+/**
+ * Defines an advanced queue worker.
+ *
+ * Plugin Namespace: Plugin\advancedqueue\Worker
+ *
+ * @Annotation
+ */
+class AdvancedQueueWorker extends PluginID {
+
+}
diff --git a/src/Command/QueueRunCommand.php b/src/Command/QueueRunCommand.php
new file mode 100644
index 0000000..5900d13
--- /dev/null
+++ b/src/Command/QueueRunCommand.php
@@ -0,0 +1,42 @@
+<?php
+
+namespace Drupal\advancedqueue\Command;
+
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+use Drupal\Console\Annotations\DrupalCommand;
+use Drupal\Console\Core\Command\ContainerAwareCommand;
+use Drupal\Console\Core\Style\DrupalStyle;
+
+/**
+ * Class QueueRunCommand.
+ *
+ * @DrupalCommand (
+ *     extension="advancedqueue",
+ *     extensionType="module"
+ * )
+ */
+class QueueRunCommand extends ContainerAwareCommand {
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function configure() {
+    $this
+      ->setName('advancedqueue:run-queue')
+      ->addArgument('queue_name', NULL, 'The queue name')
+      ->setDescription($this->trans('commands.advancedqueue.run-queue.description'));
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  protected function execute(InputInterface $input, OutputInterface $output) {
+    $io = new DrupalStyle($input, $output);
+    /** @var \Drupal\advancedqueue\Runner\ConsoleQueueRunner $console_runner */
+    $console_runner = \Drupal::service('advancedqueue.console_runner');
+    $console_runner->setIo($io);
+    $console_runner->setTranslator($this->translator);
+    $console_runner->run($input->getArgument('queue_name'));
+  }
+}
diff --git a/src/Queue/AdvancedQueueDatabase.php b/src/Queue/AdvancedQueueDatabase.php
index 887b406..fce5295 100644
--- a/src/Queue/AdvancedQueueDatabase.php
+++ b/src/Queue/AdvancedQueueDatabase.php
@@ -2,16 +2,18 @@
 
 namespace Drupal\advancedqueue\Queue;
 
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+use Drupal\Component\Datetime\TimeInterface;
 use Drupal\Core\Database\Connection;
-use Drupal\Core\Queue\DatabaseQueue;
-use Drupal\Core\Database\Query\Merge;
+use Drupal\Core\Database\SchemaObjectExistsException;
 
 /**
  * AdvancedQueue database implementation.
  *
  * @ingroup queue
  */
-class AdvancedQueueDatabase extends DatabaseQueue {
+class AdvancedQueueDatabase implements AdvancedQueueInterface {
 
   /**
    * Use our own storage table.
@@ -19,38 +21,53 @@ class AdvancedQueueDatabase extends DatabaseQueue {
   const TABLE_NAME = 'advancedqueue';
 
   /**
-   * Status codes.
+   * The database connection.
+   *
+   * @var  \Drupal\Core\Database\Connection
    */
-  const STATUS_QUEUED = -1;
-  const STATUS_PROCESSING = 0;
-  const STATUS_SUCCESS = 1;
-  const STATUS_FAILED = 2;
-  const STATUS_RETRY = 3;
+  protected $connection;
 
-  protected $schema_columns = [];
+  /**
+   * The queue name.
+   *
+   * @var 
+   */
+  protected $name;
 
   /**
-   * {@inheritdoc}
+   * The time service.
+   *
+   * @var \Drupal\Component\Datetime\TimeInterface
    */
-  public function __construct($name, Connection $connection) {
-    parent::__construct($name, $connection);
+  protected $time;
 
-    // @TODO: Load the schema for our table and set up our valid columns.
+  /**
+   * {@inheritdoc}
+   */
+  public function __construct($name, Connection $connection, TimeInterface $time) {
+    $this->name = $name;
+    $this->connection = $connection;
+    $this->time = $time;
   }
 
-  protected function doCreateItem($data, $meta) {
-    $fields = array_intersect_key($meta, $this->schema_columns);
-
-    $fields += [
-      'item_key' => NULL,
+  protected function jobToFields(QueueJob $job) {
+    return [
       'queue_name' => $this->name,
-      'uid' => 0, // @TODO: User
-      'created' => time(),
-      'status' => 0, // @TODO: Statuses...?
-      'data' => serialize($data),
+      'data' => serialize($job->getPayload()),
+      'expire' => $job->getExpire(),
+      'created' => $job->getStartTime() ?: $this->time->getCurrentTime(),
+      'item_key' => $job->getItemKey(),
+      'uid' => $job->getUid(),
+      'label' => $job->getLabel(),
+      'processed_time' => $job->getProcessedTime() ?: 0,
+      'status' => $job->getStatus(),
     ];
+  }
 
-    if (!empty($meta['item_key'])) {
+  protected function doCreateItem(QueueJob $job) {
+    $fields = $this->jobToFields($job);
+
+    if (!empty($fields['item_key'])) {
       // We allow replacing existing items based on their key. That means we
       // need a bunch of logic in this spot.
       $key = $fields['item_key'];
@@ -86,25 +103,44 @@ class AdvancedQueueDatabase extends DatabaseQueue {
   /**
    * {@inheritdoc}
    */
-  public function createItem($data) {
-    $this->createAdvancedItem($data, []);
+  public function updateJobItem(QueueJob $job) {
+    if (empty($job->getItemId())) {
+      // @todo We could insert the job automatically instead.
+      throw new \Exception('You cannot update a job item without a job ID yet.');
+    }
+
+    $fields = $this->jobToFields($job);
+
+    try {
+      // This function allows the item's creation date and data package to
+      // be modified and requeued for later.
+      $update = $this->connection->update(static::TABLE_NAME)
+        ->fields($fields)
+        ->condition('item_id', $job->getItemId());
+      return $update->execute();
+    }
+    catch (\Exception $e) {
+      $this->catchException($e);
+      // If the table doesn't exist we should consider the item released.
+      return TRUE;
+    }
   }
 
   /**
    * {@inheritdoc}
    */
-  public function releaseItem($item) {
+  public function releaseItem(QueueJob $item) {
     try {
       // This function allows the item's creation date and data package to
       // be modified and requeued for later.
       $update = $this->connection->update(static::TABLE_NAME)
         ->fields([
           'expire' => NULL,
-          'status' => static::STATUS_QUEUED,
-          'created' => $item->created,
-          'data' => serialize($item->data),
+          'status' => QueueJobStatus::STATUS_QUEUED,
+          'created' => $item->getStartTime(),
+          'data' => serialize($item->getPayload()),
         ])
-        ->condition('item_id', $item->item_id);
+        ->condition('item_id', $item->getItemId());
       return $update->execute();
     }
     catch (\Exception $e) {
@@ -117,16 +153,16 @@ class AdvancedQueueDatabase extends DatabaseQueue {
   /**
    * {@inheritdoc}
    */
-  public function deleteItem($item) {
+  public function deleteItem(QueueJob $item) {
     try {
       $update = $this->connection->update(static::TABLE_NAME)
         ->fields([
           'expire' => NULL,
-          'status' => !empty($item->status) ? $item->status : static::STATUS_SUCCESS,
-          'result' => serialize(isset($item->result) ? $item->result : array()),
-          'processed' => time(),
+          'status' => $item->getStatus(),
+          'result' => serialize($item->getResult()),
+          'processed_time' => $this->time->getCurrentTime(),
         ])
-        ->condition('item_id', $item->item_id);
+        ->condition('item_id', $item->getItemKey());
       return $update->execute();
     }
     catch (\Exception $e) {
@@ -148,10 +184,13 @@ class AdvancedQueueDatabase extends DatabaseQueue {
       try {
         $params = [
           ':name' => $this->name,
-          ':status' => static::STATUS_QUEUED,
-          ':created' => time(),
+          ':status' => QueueJobStatus::STATUS_QUEUED,
+          ':created' => $this->time->getCurrentTime(),
         ];
-        $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE status = :status AND expire IS NULL AND created <= :created AND queue_name = :name ORDER BY created, item_id ASC', 0, 1, $params)->fetchObject();
+        $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE status = :status AND expire IS NULL AND created <= :created AND queue_name = :name ORDER BY created, item_id ASC', 0, 1, $params)->fetchAssoc();
+        if ($item) {
+          $item['payload'] = unserialize($item['data']);
+        }
       }
       catch (\Exception $e) {
         $this->catchException($e);
@@ -159,7 +198,7 @@ class AdvancedQueueDatabase extends DatabaseQueue {
         // claim.
         return FALSE;
       }
-      if ($item) {
+      if ($item && $job = new QueueJob($item)) {
         // Try to update the item. Only one thread can succeed in UPDATEing the
         // same row. We cannot rely on REQUEST_TIME because items might be
         // claimed by a single consumer which runs longer than 1 second. If we
@@ -168,14 +207,14 @@ class AdvancedQueueDatabase extends DatabaseQueue {
         // should really expire.
         $update = $this->connection->update(static::TABLE_NAME)
           ->fields([
-            'expire' => time() + $lease_time,
+            'expire' => $this->time->getCurrentTime() + $lease_time,
+            'status' => QueueJobStatus::STATUS_PROCESSING,
           ])
-          ->condition('item_id', $item->item_id)
+          ->condition('item_id', $job->getItemId())
           ->isNull('expire');
         // If there are affected rows, this update succeeded.
         if ($update->execute()) {
-          $item->data = unserialize($item->data);
-          return $item;
+          return $job;
         }
       }
       else {
@@ -188,11 +227,11 @@ class AdvancedQueueDatabase extends DatabaseQueue {
   /**
    * {@inheritdoc}
    */
-  public function createAdvancedItem($data, $meta) {
+  public function createAdvancedItem(QueueJob $job) {
     // @TODO: Isn't there a less-weird way to structure this function??
     $try_again = FALSE;
     try {
-      $id = $this->doCreateItem($data, $meta);
+      $id = $this->doCreateItem($job);
     }
     catch (\Exception $e) {
       // If there was an exception, try to create the table.
@@ -204,7 +243,7 @@ class AdvancedQueueDatabase extends DatabaseQueue {
     }
     // Now that the table has been created, try again if necessary.
     if ($try_again) {
-      $id = $this->doCreateItem($data);
+      $id = $this->doCreateItem($job);
     }
 
     return $id;
@@ -234,7 +273,7 @@ class AdvancedQueueDatabase extends DatabaseQueue {
    */
   public function numberOfItems() {
     try {
-      return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE queue_name = :name AND status = ' . static::STATUS_QUEUED, [':name' => $this->name])
+      return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE queue_name = :name AND status = :status', [':name' => $this->name, ':status' => QueueJobStatus::STATUS_QUEUED])
         ->fetchField();
     }
     catch (\Exception $e) {
@@ -248,61 +287,90 @@ class AdvancedQueueDatabase extends DatabaseQueue {
    * Defines the schema for the advancedqueue table.
    */
   public function schemaDefinition() {
-    $schema = parent::schemaDefinition();
-
-    // Slightly more clear naming for the queue name column.
-    $schema['fields']['queue_name'] = $schema['fields']['name'];
-    unset($schema['fields']['name']);
-
-    // Use a null for the default of expire.
-    $schema['fields']['expire']['not null'] = FALSE;
-    unset($schema['fields']['expire']['default']);
-
-    $schema['fields'] += [
-      'item_key' => [
-        'type' => 'varchar_ascii',
-        'length' => 255,
-        'not null' => FALSE,
-        'description' => 'The unique key of the queue item, if any.',
-      ],
-      'uid' => [
-        'type' => 'int',
-        'type' => 'int',
-        'unsigned' => TRUE,
-        'not null' => TRUE,
-        'description' => 'The user to which the item belongs.',
-      ],
-      'label' => [
-        'type' => 'varchar',
-        'length' => 400,
-        'not null' => TRUE,
-        'description' => 'The title of this item.',
-      ],
-      'processed' => [
-        'type' => 'int',
-        'not null' => TRUE,
-        'default' => 0,
-        'description' => 'Timestamp when the item was processed.',
+    return [
+      'description' => 'Stores items in queues.',
+      'fields' => [
+        'item_id' => [
+          'type' => 'serial',
+          'unsigned' => TRUE,
+          'not null' => TRUE,
+          'description' => 'Primary Key: Unique item ID.',
+        ],
+        'queue_name' => [
+          'type' => 'varchar_ascii',
+          'length' => 255,
+          'not null' => TRUE,
+          'default' => '',
+          'description' => 'The queue name.',
+        ],
+        'data' => [
+          'type' => 'blob',
+          'not null' => FALSE,
+          'size' => 'big',
+          'serialize' => TRUE,
+          'description' => 'The arbitrary data for the item.',
+        ],
+        'expire' => [
+          'type' => 'int',
+          // @todo Is this change really needed?
+          'not null' => FALSE,
+          'default' => 0,
+          'description' => 'Timestamp when the claim lease expires on the item.',
+        ],
+        'created' => [
+          'type' => 'int',
+          'not null' => TRUE,
+          'default' => 0,
+          'description' => 'Timestamp when the item was created.',
+        ],
+        'item_key' => [
+          'type' => 'varchar_ascii',
+          'length' => 255,
+          'not null' => FALSE,
+          'description' => 'The unique key of the queue item, if any.',
+        ],
+        'uid' => [
+          'type' => 'int',
+          'unsigned' => TRUE,
+          'not null' => TRUE,
+          'description' => 'The user to which the item belongs.',
+        ],
+        'label' => [
+          'type' => 'varchar',
+          'length' => 400,
+          'not null' => TRUE,
+          'description' => 'The title of this item.',
+        ],
+        'processed_time' => [
+          'type' => 'int',
+          'not null' => TRUE,
+          'default' => 0,
+          'description' => 'Timestamp when the item was processed.',
+        ],
+        'status' => [
+          'type' => 'int',
+          'not null' => TRUE,
+          'default' => -1,
+          'size' => 'tiny',
+          'description' => 'Indicates whether the item has been processed (-1 = queue, 0 = processing, 1 = successfully processed, 2 = failed).',
+        ],
+        'result' => [
+          'type' => 'blob',
+          'not null' => FALSE,
+          'size' => 'big',
+          'serialize' => TRUE,
+          'description' => 'The arbitrary result for the item, only significant if {advancedqueue}.status <> 0',
+        ],
       ],
-      'status' => [
-        'type' => 'int',
-        'not null' => TRUE,
-        'default' => -1,
-        'size' => 'tiny',
-        'description' => 'Indicates whether the item has been processed (-1 = queue, 0 = processing, 1 = successfully processed, 2 = failed).',
+      'primary key' => ['item_id'],
+      'indexes' => [
+        'name_created' => ['queue_name', 'created'],
+        'expire' => ['expire'],
       ],
-      'result' => [
-        'type' => 'blob',
-        'not null' => FALSE,
-        'size' => 'big',
-        'serialize' => TRUE,
-        'description' => 'The arbitrary result for the item, only significant if {advancedqueue}.status <> 0',
+      'unique keys' => [
+        'item_key' => ['item_key'],
       ],
     ];
-
-    $schema['unique keys']['item_key'] = ['item_key'];
-
-    return $schema;
   }
 
   /**
@@ -318,5 +386,69 @@ class AdvancedQueueDatabase extends DatabaseQueue {
       $this->catchException($e);
     }
   }
+
+  public function createJobItem(QueueJob $job) {
+    return $this->createAdvancedItem($job);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getItemByItemKey($item_key) {
+    $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE queue_name = :queue_name AND item_key = :item_key', 0, 1, ['queue_name' => $this->name, 'item_key' => $item_key])->fetchAssoc();
+    if ($item) {
+      return new QueueJob($item);
+    }
+  }
+  
+
+  /**
+   * {@inheritdoc}
+   */
+  public function createQueue() {
+    // All tasks are stored in a single database table (which is created on
+    // demand) so there is nothing we need to do to create a new queue.
+  }
+
+  /**
+   * 
+   * Check if the table exists and create it if not.
+   */
+  protected function ensureTableExists() {
+    try {
+      $database_schema = $this->connection->schema();
+      if (!$database_schema->tableExists(static::TABLE_NAME)) {
+        $schema_definition = $this->schemaDefinition();
+        $database_schema->createTable(static::TABLE_NAME, $schema_definition);
+        return TRUE;
+      }
+    }
+      // If another process has already created the queue table, attempting to
+      // recreate it will throw an exception. In this case just catch the
+      // exception and do nothing.
+    catch (SchemaObjectExistsException $e) {
+      return TRUE;
+    }
+    return FALSE;
+  }
+
+  /**
+   * Act on an exception when queue might be stale.
+   *
+   * If the table does not yet exist, that's fine, but if the table exists and
+   * yet the query failed, then the queue is stale and the exception needs to
+   * propagate.
+   *
+   * @param $e
+   *   The exception.
+   *
+   * @throws \Exception
+   *   If the table exists the exception passed in is rethrown.
+   */
+  protected function catchException(\Exception $e) {
+    if ($this->connection->schema()->tableExists(static::TABLE_NAME)) {
+      throw $e;
+    }
+  }
 }
 
diff --git a/src/Queue/AdvancedQueueDatabaseFactory.php b/src/Queue/AdvancedQueueDatabaseFactory.php
index 71a5f81..b52cf0c 100644
--- a/src/Queue/AdvancedQueueDatabaseFactory.php
+++ b/src/Queue/AdvancedQueueDatabaseFactory.php
@@ -2,6 +2,8 @@
 
 namespace Drupal\advancedqueue\Queue;
 
+use Drupal\Component\Datetime\TimeInterface;
+use Drupal\Core\Database\Connection;
 use Drupal\Core\Queue\QueueDatabaseFactory;
 
 /**
@@ -10,16 +12,29 @@ use Drupal\Core\Queue\QueueDatabaseFactory;
 class AdvancedQueueDatabaseFactory extends QueueDatabaseFactory {
 
   /**
+   * The time service.
+   *
+   * @var \Drupal\Component\Datetime\TimeInterface
+   */
+  protected $time;
+
+  public function __construct(Connection $connection, TimeInterface $time) {
+    parent::__construct($connection);
+
+    $this->time = $time;
+  }
+
+  /**
    * Constructs a new queue object for a given name.
    *
    * @param string $name
    *   The name of the requested queue.
    *
-   * @return \Drupal\Core\Queue\DatabaseQueue
+   * @return \Drupal\advancedqueue\Queue\AdvancedQueueInterface
    *   A queue class for the specified queue.
    */
   public function get($name) {
-    return new AdvancedQueueDatabase($name, $this->connection);
+    return new AdvancedQueueDatabase($name, $this->connection, $this->time);
   }
 
 }
diff --git a/src/Queue/AdvancedQueueFactory.php b/src/Queue/AdvancedQueueFactory.php
index 5dc3d00..7098e97 100644
--- a/src/Queue/AdvancedQueueFactory.php
+++ b/src/Queue/AdvancedQueueFactory.php
@@ -13,18 +13,18 @@ use Drupal\Core\Queue\QueueFactory;
 class AdvancedQueueFactory extends QueueFactory {
 
   /**
-   * {@inheritdoc}
+   * @return \Drupal\advancedqueue\Queue\AdvancedQueueInterface
    */
   public function get($name, $reliable = FALSE) {
     if (!isset($this->queues[$name])) {
       // If it is a reliable queue, check the specific settings first.
       if ($reliable) {
-        $service_name = $this->settings->get('queue_reliable_service_' . $name);
+        $service_name = $this->settings->get('advancedqueue_reliable_service_' . $name);
       }
       // If no reliable queue was defined, check the service and global
       // settings, fall back to queue.database.
       if (empty($service_name)) {
-        $service_name = $this->settings->get('queue_service_' . $name, $this->settings->get('queue_default', 'advancedqueue.database'));
+        $service_name = $this->settings->get('advancedqueue_service_' . $name, $this->settings->get('queue_default', 'advancedqueue.database'));
       }
       $this->queues[$name] = $this->container->get($service_name)->get($name);
     }
diff --git a/src/Queue/AdvancedQueueInterface.php b/src/Queue/AdvancedQueueInterface.php
index f385e95..01f0cf0 100644
--- a/src/Queue/AdvancedQueueInterface.php
+++ b/src/Queue/AdvancedQueueInterface.php
@@ -2,13 +2,13 @@
 
 namespace Drupal\advancedqueue\Queue;
 
-use Drupal\Core\Queue\ReliableQueueInterface;
+use Drupal\advancedqueue\QueueJob;
 
 /**
  * An AdvancedQueue interface as a guide to later possible non-database
  * implementations.
  */
-interface AdvancedQueueInterface extends ReliableQueueInterface {
+interface AdvancedQueueInterface {
 
   /**
    * Create an item with metadata for the item itself, allowing the caller to
@@ -26,5 +26,30 @@ interface AdvancedQueueInterface extends ReliableQueueInterface {
    *   A unique ID if the item was successfully created and was added to the
    *   queue, otherwise FALSE.
    */
-  public function createAdvancedItem($data, $meta);
+  public function createJobItem(QueueJob $job);
+
+  public function updateJobItem(QueueJob $job);
+
+  /**
+   * @return \Drupal\advancedqueue\QueueJob|NULL
+   */
+  public function getItemByItemKey($item_key);
+
+  public function numberOfItems();
+
+  /**
+   * @param int $lease_time
+   *
+   * @return \Drupal\advancedqueue\QueueJob
+   */
+  public function claimItem($lease_time = 3600);
+
+  public function deleteItem(QueueJob $job);
+
+  public function releaseItem(QueueJob $job);
+
+  public function createQueue();
+
+  public function deleteQueue();
+
 }
diff --git a/src/Queue/AdvancedQueueWorkerInterface.php b/src/Queue/AdvancedQueueWorkerInterface.php
index e69de29..533a535 100644
--- a/src/Queue/AdvancedQueueWorkerInterface.php
+++ b/src/Queue/AdvancedQueueWorkerInterface.php
@@ -0,0 +1,21 @@
+<?php
+
+namespace Drupal\advancedqueue\Queue;
+
+use Drupal\advancedqueue\QueueJob;
+
+/**
+ * @todo Should the advanced stuff like getGroups by on its own custom interface?
+ */
+interface AdvancedQueueWorkerInterface {
+
+  /**
+   * @param \Drupal\advancedqueue\QueueJob $job
+   *
+   * @return \Drupal\advancedqueue\QueueJobStatus
+   */
+  public function processItem(QueueJob $job);
+
+  public function getGroups();
+
+}
diff --git a/src/QueueJob.php b/src/QueueJob.php
new file mode 100644
index 0000000..023827e
--- /dev/null
+++ b/src/QueueJob.php
@@ -0,0 +1,195 @@
+<?php
+
+namespace Drupal\advancedqueue;
+
+class QueueJob {
+
+  protected $itemId;
+
+  protected $itemKey;
+
+  protected $queueName;
+
+  protected $uid;
+
+  protected $startTime;
+
+  protected $status;
+
+  protected $payload;
+
+  protected $expire;
+
+  protected $label;
+
+  protected $processedTime;
+
+  protected $result;
+
+  /**
+   * QueueJob constructor.
+   *
+   * @param $payload
+   * @param $itemKey
+   * @param $uid
+   * @param $startTime
+   * @param $status
+   */
+  public function __construct(array $definition) {
+    $definition += [
+      'item_id' => NULL,
+      'queue_name' => '',
+      'item_key' => NULL,
+      'uid' => 0,
+      'created' => NULL,
+      'expire' => NULL,
+      'payload' => NULL,
+      'label' => '',
+      'processed_time' => NULL,
+      'result' => NULL,
+      'status' => QueueJobStatus::STATUS_QUEUED,
+    ];
+
+    $this->itemId = $definition['item_id'];
+    $this->itemKey = $definition['item_key'];
+    $this->queueName = $definition['queue_name'];
+    $this->uid = $definition['uid'];
+    $this->startTime = $definition['created'];
+    $this->status = $definition['status'];
+    $this->payload = $definition['payload'];
+    $this->expire = $definition['expire'];
+    $this->label = $definition['label'];
+    $this->processedTime = $definition['processed_time'];
+    $this->result = $definition['result'];
+  }
+
+  public static function create($payload, $itemKey = NULL) {
+    return new static([
+      'payload' => $payload,
+      'item_key' => $itemKey,
+    ]);
+  }
+
+  public static function createForTime($payload, $created, $itemKey = NULL) {
+    return new static([
+      'payload' => $payload,
+      'item_key' => $itemKey,
+      'created' => $created,
+    ]);
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getExpire() {
+    return $this->expire;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getProcessedTime() {
+    return $this->processedTime;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getLabel() {
+    return $this->label;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getItemId() {
+    return $this->itemId;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getItemKey() {
+    return $this->itemKey;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getUid() {
+    return $this->uid;
+  }
+
+  /**
+   * Determines when the queue job should be started.
+   *
+   * By default this is now aka. as soon as possible.
+   *
+   * @return mixed
+   */
+  public function getStartTime() {
+    return $this->startTime;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getStatus() {
+    return $this->status;
+  }
+
+  /**
+   * @param null $status
+   *
+   * @returns static
+   */
+  public function setStatus($status) {
+    $job = clone $this;
+    $job->status = $status;
+    return $job;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getPayload() {
+    return $this->payload;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getResult() {
+    return $this->result;
+  }
+
+  /**
+   * Updates the result.
+   *
+   * @param mixed $result
+   *
+   * @return static
+   */
+  public function setResult($result) {
+    $job = clone $this;
+    $job->result = $result;
+    return $job;
+  }
+
+  /**
+   * @return mixed
+   */
+  public function getQueueName() {
+    return $this->queueName;
+  }
+
+  /**
+   * @param mixed $queueName
+   */
+  public function setQueueName($queueName) {
+    $job = clone $this;
+    $job->queueName = $queueName;
+    return $job;
+  }
+
+}
diff --git a/src/QueueJobStatus.php b/src/QueueJobStatus.php
new file mode 100644
index 0000000..ae82c4a
--- /dev/null
+++ b/src/QueueJobStatus.php
@@ -0,0 +1,59 @@
+<?php
+
+namespace Drupal\advancedqueue;
+
+class QueueJobStatus {
+
+  /**
+   * Status codes.
+   */
+  const STATUS_QUEUED = -1;
+  const STATUS_PROCESSING = 0;
+  const STATUS_SUCCESS = 1;
+  const STATUS_FAILED = 2;
+  const STATUS_RETRY = 3;
+
+  protected $status;
+
+  /**
+   * @var mixed
+   */
+  protected $result;
+
+  /**
+   * QueueJobStatus constructor.
+   */
+  public function __construct($status, $result = NULL) {
+    $this->status = $status;
+    $this->result = $result;
+  }
+
+  public function is($status) {
+    return $this->status === $status;
+  }
+
+  public function getResult() {
+    return $this->result;
+  }
+
+  public static function queued() {
+    return new static(static::STATUS_QUEUED);
+  }
+
+  public static function processing() {
+    return new static(static::STATUS_PROCESSING);
+  }
+
+  public static function success($result = NULL) {
+    return new static(static::STATUS_SUCCESS, $result);
+  }
+
+  public static function failed() {
+    return new static(static::STATUS_FAILED);
+  }
+
+  public static function retry() {
+    return new static(static::STATUS_RETRY);
+  }
+
+}
diff --git a/src/Runner/AdvancedQueueRunnerBase.php b/src/Runner/AdvancedQueueRunnerBase.php
index 9ced9eb..c9d6fd6 100644
--- a/src/Runner/AdvancedQueueRunnerBase.php
+++ b/src/Runner/AdvancedQueueRunnerBase.php
@@ -2,20 +2,26 @@
 
 namespace Drupal\advancedqueue\Runner;
 
-use Drupal\Core\Queue\QueueFactory;
-use Drupal\Core\Queue\QueueWorkerManagerInterface;
+use Drupal\advancedqueue\Queue\AdvancedQueueFactory;
+use Drupal\advancedqueue\Queue\AdvancedQueueInterface;
+use Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+use Drupal\Component\Datetime\TimeInterface;
+use Drupal\Component\Plugin\PluginManagerInterface;
+use Drupal\Core\Queue\RequeueException;
+use Drupal\Core\Queue\SuspendQueueException;
 
 /**
  * @file Placeholder for eventual Drush command. Porting just the original
  * runner logic to here.
  */
-
-
 abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface {
 
   /**
    * The queue factory used to get the actual queue to claim items from.
-   * @var \Drupal\Core\Queue\QueueFactory
+   *
+   * @var \Drupal\advancedqueue\Queue\AdvancedQueueFactory
    */
   protected $queueFactory;
 
@@ -26,155 +32,115 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface {
   protected $queueWorkerManager;
 
   /**
-   * The queue worker objects.
-   * @var \Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface[]
-   */
-  protected $queueWorkers = [];
-
-  /**
-   * The defined queue worker groups.
-   * @var string[]
+   * @var \Drupal\Component\Datetime\TimeInterface
    */
-  protected $queueWorkerGroups = [];
+  protected $time;
 
   /**
    * Constructs the queue runner.
    *
-   * @param \Drupal\Core\Queue\QueueFactory $queueFactory
+   * @param \Drupal\advancedqueue\Queue\AdvancedQueueFactory $queueFactory
    *   The queue service.
-   * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queueWorkerManager
+   * @param \Drupal\Component\Plugin\PluginManagerInterface $queueWorkerManager
    *   The queue plugin manager.
+   * @param \Drupal\Component\Datetime\TimeInterface $time
+   *   The current time.
    */
-  public function __construct(QueueFactory $queueFactory, QueueWorkerManagerInterface $queueWorkerManager) {
+  public function __construct(AdvancedQueueFactory $queueFactory, PluginManagerInterface $queueWorkerManager, TimeInterface $time) {
     $this->queueFactory = $queueFactory;
     $this->queueWorkerManager = $queueWorkerManager;
-    $definitions = $this->queueWorkerManager->getDefinitions();
-
-    // Go through our definitions to do some setup.
-    foreach ($definitions as $name => $info) {
-      $worker = $this->queueWorkerManager->createInstance($name);
-      if ($worker instanceof AdvancedQueueWorkerInterface) {
-        // Ensure the queue exists.
-        $this->queueFactory->get($name)->createQueue();
-        $this->queueWorkers[$name] = $this->queueWorkerManager->createInstance($name);
-        // Set the group list.
-        foreach ($worker->getGroups() as $group) {
-          $this->queueWorkerGroups[] = $group;
-        }
-      }
-    }
-    // Make sure groups aren't duplicated.
-    $this->queueWorkerGroups = array_unique($this->queueWorkerGroups);
+    $this->time = $time;
   }
 
-  /**
-   * Set the runner to use a list of queue groups, throwing an exception if any
-   * of the requested group names do not exist.
-   *
-   * @param string[] $groupNames
-   *   The names of the groups being requested.
-   */
-  public function setGroups(array $groupNames) {
-    $invalidGroups = array_diff($groupNames, $this->queueWorkerGroups);
-    if (!empty($invalidGroups)) {
-      $invalidGroupNames = implode(', ', $invalidGroups);
-      throw new Exception('The following groups are invalid: ' . $invalidGroupNames);
-    }
-    else {
-      // Set us up to process queue workers.
-      $this->queueWorkerGroups = $groupNames;
-      // All our groups are fine. Select only the queues that belong to any of
-      // the requested groups.
-      foreach ($this->queueWorkers as $name => $worker) {
-        if (!array_intersect($this->queueWorkerGroups, $worker->getGroups())) {
-          // Workers which don't belong to any group are excluded.
-          unset($this->queueWorkers[$name]);
-        }
+  public function run($queue_name, $timeLimit = 30) {
+    $queue = $this->queueFactory->get($queue_name);
+    $worker = $this->queueWorkerManager->createInstance($queue_name);
+
+    // Grab the defined cron queues.
+    $end = $this->time->getCurrentTime() + $timeLimit;
+    while ($this->time->getCurrentTime() < $end) {
+      // @todo Implement that
+      // $leaseTime = $worker->getLeaseTime();
+      $leaseTime = 30;
+      try {
+        $this->processNextItem($queue, $worker, $leaseTime);
+      } catch (SuspendQueueException $e) {
+        continue;
       }
     }
   }
 
-  /**
-   * Set the runner to use a list of queues, throwing an exception if any of
-   * the requested queues do not exist.
-   *
-   * @param string[] $queueNames
-   *   The names of the queues being requested.
-   */
-  public function setQueues(array $queueNames) {
-    $invalidQueues = array_diff($queueNames, array_keys($this->queueWorkers));
-    if (!empty($invalidQueues)) {
-      $invalidQueueNames = implode(', ', $invalidQueues);
-      throw new Exception('The following queues are invalid: ' . $invalidQueueNames);
-    }
-    else {
-      foreach ($this->queueWorkers as $name => $worker) {
-        if (!in_array($name, $queueNames)) {
-          unset($this->queueWorkers[$name]);
-        }
-      }
-    }
+  protected function onBeforeProcessing(QueueJob $job) {
   }
 
-  /**
-   * Processes AdvancedQueue worker queues.
-   *
-   * @param int $timeLimit
-   *   A time limit in seconds.
-   *   @TODO: Figure out the best way to allow going infinite.
-   */
-  protected function processQueues($timeLimit) {
-    // Grab the defined cron queues.
-    $end = time() + $timeLimit;
-    while (time() < $end) {
-      foreach ($this->queueWorkers as $name => $worker) {
-        $leaseTime = $worker->getLeaseTime();
-        $queue = $this->queueFactory->get($name);
-        if ($item = $queue->claimItem($leaseTime)) {
-          // This is the part which e.g. a FastCGI runner will reimplement.
-          // The default process function simply phones the worker, but
-          // FCGI will need to do more magical things.
-          try {
-            $result = $this->process($worker, $item);
-            // @TODO: Assuming a result, handle it here.
-          }
-          catch (\Exception $e) {
-            // @TODO: Exception logic goes here. See commented code below.
-            // @TODO: Also, we probably need to generate correct $result sets
-            // for known exceptions and handle the result later? TBD.
-
-                // try {
-                //   $queue_worker->processItem($item->data);
-                //   $queue->deleteItem($item);
-                // }
-                // catch (RequeueException $e) {
-                //   // The worker requested the task be immediately requeued.
-                //   $queue->releaseItem($item);
-                // }
-                // catch (SuspendQueueException $e) {
-                //   // If the worker indicates there is a problem with the whole queue,
-                //   // release the item and skip to the next queue.
-                //   $queue->releaseItem($item);
-
-                //   watchdog_exception('cron', $e);
-
-                //   // Skip to the next queue.
-                //   continue 2;
-                // }
-                // catch (\Exception $e) {
-                //   // In case of any other kind of exception, log it and leave the item
-                //   // in the queue to be processed again later.
-                //   watchdog_exception('cron', $e);
-                // }
-          }
-
-          // If we make it all the way to here, it means nothing really bad was
-          // thrown and we can go back to the beginning.
-          continue 2;
+  protected function onException(\Exception $e, QueueJob $job) {
+    watchdog_exception('cron', $e);
+  }
+
+  protected function onSuspendQueueException(SuspendQueueException $e, QueueJob $job) {
+    watchdog_exception('cron', $e);
+  }
+
+  protected function onRequeueException($e, QueueJob $job) {
+  }
+
+  protected function processNextItem(AdvancedQueueInterface $queue, AdvancedQueueWorkerInterface $worker, $leaseTime) {
+    if ($job = $queue->claimItem($leaseTime)) {
+      // This is the part which e.g. a FastCGI runner will reimplement.
+      // The default process function simply phones the worker, but
+      // FCGI will need to do more magical things.
+      try {
+        $this->onBeforeProcessing($job);
+  
+        $result = $worker->processItem($job);
+        if ($result === NULL) {
+          // If there is no result we assume life is fine.
+          $queue->deleteItem($job);
         }
-        // If the conditional above fails, then no item was claimed and we can
-        // allow the foreach to take us to the next queue.
+        elseif ($result->is(QueueJobStatus::STATUS_QUEUED)) {
+          throw new \Exception('Workers are not allowed to set the status back to queued');
+        }
+        elseif ($result->is(QueueJobStatus::STATUS_PROCESSING)) {
+          throw new \Exception('Workers should set the status to failed, retry or success');;
+        }
+        elseif ($result->is(QueueJobStatus::STATUS_RETRY)) {
+          $queue->releaseItem($job);
+        }
+        elseif ($result->is(QueueJobStatus::STATUS_FAILED)) {
+          $job->setStatus(QueueJobStatus::STATUS_FAILED);
+          $queue->updateJobItem($job);
+        }
+        elseif ($result->is(QueueJobStatus::STATUS_SUCCESS)) {
+          $job = $job->setStatus(QueueJobStatus::STATUS_SUCCESS);
+          $job = $job->setResult($result->getResult());
+          $queue->updateJobItem($job);
+        }
+        return $result;
+      }
+      catch (RequeueException $e) {
+        $this->onRequeueException($e, $job);
+
+        // The worker requested the task be immediately requeued.
+        $queue->releaseItem($job);
+        return QueueJobStatus::retry();
+      }
+      catch (SuspendQueueException $e) {
+        $this->onSuspendQueueException($e, $job);
+  
+        // If the worker indicates there is a problem with the whole queue,
+        // release the item and skip to the next queue.
+        $queue->releaseItem($job);
+
+        // Skip to the next queue.
+        throw $e;
+      }
+      catch (\Exception $e) {
+        $this->onException($e, $job);
+  
+        // Skip to the next queue.
+        throw new SuspendQueueException($e->getMessage(), $e->getCode(), $e);
       }
     }
   }
+
 }
diff --git a/src/Runner/AdvancedQueueRunnerInterface.php b/src/Runner/AdvancedQueueRunnerInterface.php
new file mode 100644
index 0000000..f18a1f4
--- /dev/null
+++ b/src/Runner/AdvancedQueueRunnerInterface.php
@@ -0,0 +1,9 @@
+<?php
+
+
+namespace Drupal\advancedqueue\Runner;
+
+
+interface AdvancedQueueRunnerInterface {
+
+}
diff --git a/src/Runner/ConsoleQueueRunner.php b/src/Runner/ConsoleQueueRunner.php
new file mode 100644
index 0000000..18d0683
--- /dev/null
+++ b/src/Runner/ConsoleQueueRunner.php
@@ -0,0 +1,34 @@
+<?php
+
+namespace Drupal\advancedqueue\Runner;
+
+use Drupal\advancedqueue\QueueJob;
+use Drupal\Console\Core\Command\Shared\CommandTrait;
+
+class ConsoleQueueRunner extends AdvancedQueueRunnerBase {
+
+  use CommandTrait;
+
+  /**
+   * @var \Drupal\Console\Core\Style\DrupalStyle
+   */
+  protected $io;
+
+  public function setIo($io) {
+    $this->io = $io;
+  }
+
+  protected function onBeforeProcessing(QueueJob $job) {
+    $this->io->info($this->translator->trans('commands.advancedqueue.run-queue.messages.queue_job_processsing_start', ['@name' => $job->getQueueName(), '@id' => $job->getItemId()]));
+  }
+
+  protected function onException(\Exception $e, QueueJob $job) {
+    $this->io->error($this->translator->trans('commands.advancedqueue.run-queue.messages.queue_job_exception',
+      [
+        '@name' => $job->getQueueName(),
+        '@id' => $job->getItemId(),
+        '@message' => $e->getMessage(),
+      ]));
+  }
+
+}
diff --git a/src/Runner/DrushQueueRunner.php b/src/Runner/DrushQueueRunner.php
new file mode 100644
index 0000000..1b2cbf9
--- /dev/null
+++ b/src/Runner/DrushQueueRunner.php
@@ -0,0 +1,19 @@
+<?php
+
+namespace Drupal\advancedqueue\Runner;
+
+use Drupal\advancedqueue\QueueJob;
+
+class DrushQueueRunner extends AdvancedQueueRunnerBase {
+
+  protected function onBeforeProcessing(QueueJob $job) {
+    drush_log(dt('Processing item @id from @name queue.', ['@name' => $job->getQueueName(), 'id' => $job->getItemId(), 'info']));
+  }
+
+  protected function onException(\Exception $e) {
+    // In case of any other kind of exception, log it and leave the item
+    // in the queue to be processed again later.
+    drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage());
+  }
+
+}
diff --git a/src/Runner/SerialQueueRunner.php b/src/Runner/SerialQueueRunner.php
new file mode 100644
index 0000000..48542da
--- /dev/null
+++ b/src/Runner/SerialQueueRunner.php
@@ -0,0 +1,8 @@
+<?php
+
+namespace Drupal\advancedqueue\Runner;
+
+class SerialQueueRunner extends AdvancedQueueRunnerBase {
+
+
+}
diff --git a/tests/modules/advancedqueue_test/advancedqueue_test.info.yml b/tests/modules/advancedqueue_test/advancedqueue_test.info.yml
new file mode 100644
index 0000000..983affe
--- /dev/null
+++ b/tests/modules/advancedqueue_test/advancedqueue_test.info.yml
@@ -0,0 +1,5 @@
+name: advancedqueue tet
+core: 8.x
+type: module
+dependencies:
+  - advancedqueue
diff --git a/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php
new file mode 100644
index 0000000..d9e42bb
--- /dev/null
+++ b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php
@@ -0,0 +1,21 @@
+<?php
+
+namespace Drupal\advancedqueue_test\Plugin\advancedqueue\Worker;
+
+use Drupal\advancedqueue\AdvancedQueueWorkerBase;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+
+/**
+ * @AdvancedQueueWorker("worker1")
+ */
+class Worker1 extends AdvancedQueueWorkerBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function processItem(QueueJob $job) {
+    return QueueJobStatus::success('my result');
+  }
+
+}
diff --git a/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker2.php b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker2.php
new file mode 100644
index 0000000..9875d5a
--- /dev/null
+++ b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker2.php
@@ -0,0 +1,28 @@
+<?php
+
+namespace Drupal\advancedqueue_test\Plugin\advancedqueue\Worker;
+
+use Drupal\advancedqueue\AdvancedQueueWorkerBase;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+
+/**
+ * @AdvancedQueueWorker("worker2")
+ */
+class Worker2 extends AdvancedQueueWorkerBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function processItem(QueueJob $job) {
+    return QueueJobStatus::success('my result');
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getGroups() {
+    return ['group'];
+  }
+
+}
diff --git a/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/WorkerStatus.php b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/WorkerStatus.php
new file mode 100644
index 0000000..571c0b3
--- /dev/null
+++ b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/WorkerStatus.php
@@ -0,0 +1,30 @@
+<?php
+
+namespace Drupal\advancedqueue_test\Plugin\advancedqueue\Worker;
+
+use Drupal\advancedqueue\AdvancedQueueWorkerBase;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+
+/**
+ * @AdvancedQueueWorker("worker_status")
+ */
+class WorkerStatus extends AdvancedQueueWorkerBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public function processItem(QueueJob $job) {
+    $new_status = $job->getPayload()['status'];
+    $new_result = $job->getPayload()['result'];
+    return new QueueJobStatus($new_status, $new_result);
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getGroups() {
+    return ['group'];
+  }
+
+}
diff --git a/tests/src/Kernel/AdvancedQueueKernelTest.php b/tests/src/Kernel/AdvancedQueueKernelTest.php
new file mode 100644
index 0000000..0a59c28
--- /dev/null
+++ b/tests/src/Kernel/AdvancedQueueKernelTest.php
@@ -0,0 +1,211 @@
+<?php
+
+namespace Drupal\Tests\advancedqueue\Kernel;
+
+use Drupal\advancedqueue\Queue\AdvancedQueueDatabase;
+use Drupal\advancedqueue\Queue\AdvancedQueueInterface;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\KernelTests\KernelTestBase;
+
+/**
+ * @group advancedqueue
+ */
+class AdvancedQueueKernelTest extends KernelTestBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public static $modules = ['advancedqueue'];
+
+  public function testNormalQueue() {
+    $advanced_queue_1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), \Drupal::time());
+    $advanced_queue_2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), \Drupal::time());
+    $this->runQueueTest($advanced_queue_1, $advanced_queue_2);
+  }
+
+  /**
+   * Queues and dequeues a set of items to check the basic queue functionality.
+   *
+   * @param \Drupal\Core\Queue\QueueInterface $queue1
+   *   An instantiated queue object.
+   * @param \Drupal\Core\Queue\QueueInterface $queue2
+   *   An instantiated queue object.
+   */
+  protected function runQueueTest(AdvancedQueueInterface $queue1, AdvancedQueueInterface $queue2) {
+    // Create four items.
+    $data = [];
+    for ($i = 0; $i < 4; $i++) {
+      $data[] = [$this->randomMachineName() => $this->randomMachineName()];
+    }
+
+    // Queue items 1 and 2 in the queue1.
+    $queue1->createItem($data[0]);
+    $queue1->createItem($data[1]);
+
+    // Retrieve two items from queue1.
+    $items = [];
+    $new_items = [];
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    // First two dequeued items should match the first two items we queued.
+    $this->assertEquals(2, $this->queueScore($data, $new_items), 'Two items matched');
+
+    // Add two more items.
+    $queue1->createItem($data[2]);
+    $queue1->createItem($data[3]);
+
+    $this->assertTrue($queue1->numberOfItems(), 'Queue 1 is not empty after adding items.');
+    $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty while Queue 1 has items');
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    // All dequeued items should match the items we queued exactly once,
+    // therefore the score must be exactly 4.
+    $this->assertEquals(4, $this->queueScore($data, $new_items), 'Four items matched');
+
+    // There should be no duplicate items.
+    $this->assertEquals(4, $this->queueScore($new_items, $new_items), 'Four items matched');
+
+    // Delete all items from queue1.
+    foreach ($items as $item) {
+      $queue1->deleteItem($item);
+    }
+
+    // Check that both queues are empty.
+    $this->assertFalse($queue1->numberOfItems(), 'Queue 1 is empty');
+    $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty');
+  }
+
+  /**
+   * Returns the number of equal items in two arrays.
+   */
+  protected function queueScore($items, $new_items) {
+    $score = 0;
+    foreach ($items as $item) {
+      foreach ($new_items as $new_item) {
+        if ($item === $new_item) {
+          $score++;
+        }
+      }
+    }
+    return $score;
+  }
+
+
+  protected function runQueueTestWithAdvancedQueueCreate(AdvancedQueueInterface $queue1, AdvancedQueueInterface $queue2) {
+    // Create four items.
+    $data = [];
+    for ($i = 0; $i < 4; $i++) {
+      $data[] = [$this->randomMachineName() => $this->randomMachineName()];
+    }
+
+    // Queue items 1 and 2 in the queue1.
+    $queue1->createJobItem(QueueJob::create($data[0]));
+    $queue1->createJobItem(QueueJob::create($data[1]));
+
+    // Retrieve two items from queue1.
+    $items = [];
+    $new_items = [];
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    // First two dequeued items should match the first two items we queued.
+    $this->assertEquals(2, $this->queueScore($data, $new_items), 'Two items matched');
+
+    // Add two more items.
+    $queue1->createJobItem(QueueJob::create($data[2]));
+    $queue1->createJobItem(QueueJob::create($data[3]));
+
+    $this->assertNotEmpty($queue1->numberOfItems(), 'Queue 1 is not empty after adding items.');
+    $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty while Queue 1 has items');
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    $items[] = $item = $queue1->claimItem();
+    $new_items[] = $item->getPayload();
+
+    // All dequeued items should match the items we queued exactly once,
+    // therefore the score must be exactly 4.
+    $this->assertEquals(4, $this->queueScore($data, $new_items), 'Four items matched');
+
+    // There should be no duplicate items.
+    $this->assertEquals(4, $this->queueScore($new_items, $new_items), 'Four items matched');
+
+    // Delete all items from queue1.
+    foreach ($items as $item) {
+      $queue1->deleteItem($item);
+    }
+
+    // Check that both queues are empty.
+    $this->assertFalse($queue1->numberOfItems(), 'Queue 1 is empty');
+    $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty');
+  }
+
+  /**
+   * @todo is this test really that useful?
+   */
+  public function testAdvancedQueueSimpleProcessing() {
+    $this->time = new TestTime();
+    $this->time->setCurrentTime(4);
+    $this->time->setRequestTime(0);
+
+    $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), $this->time);
+    $queue2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), $this->time);
+
+    $this->runQueueTestWithAdvancedQueueCreate($queue1, $queue2);
+  }
+
+  public function testAdvancedQueueTimeBasedProcessing() {
+    // Create four items.
+    $data = [];
+    for ($i = 0; $i < 4; $i++) {
+      $data[] = [$this->randomMachineName() => $this->randomMachineName()];
+    }
+
+    $time = new TestTime();
+    $time->setCurrentTime(4);
+    $time->setRequestTime(0);
+
+    $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), $time);
+    $queue2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), $time);
+
+    $queue1->createAdvancedItem(QueueJob::createForTime($data[0], 5));
+    $queue1->createAdvancedItem(QueueJob::createForTime($data[1], 10));
+
+    $this->assertFalse($queue1->claimItem());
+    $this->assertFalse($queue2->claimItem());
+
+    $time->setCurrentTime(7);
+    $this->fetchAllItems();
+    $item = $queue1->claimItem();
+    $this->fetchAllItems();
+    $this->assertEquals($data[0], $item->getPayload());
+    $item = $queue1->claimItem();
+    $this->assertFalse($item);
+
+    $time->setCurrentTime(11);
+    $item = $queue1->claimItem();
+    $this->assertEquals($data[1], $item->getPayload());
+    $item = $queue1->claimItem();
+    $this->assertFalse($item);
+  }
+
+  protected function fetchAllItems() {
+    print_r(\Drupal::database()->query("SELECT * from {advancedqueue}")->fetchAll());
+  }
+
+}
diff --git a/tests/src/Kernel/AdvancedQueueRunnerTest.php b/tests/src/Kernel/AdvancedQueueRunnerTest.php
new file mode 100644
index 0000000..74a3292
--- /dev/null
+++ b/tests/src/Kernel/AdvancedQueueRunnerTest.php
@@ -0,0 +1,157 @@
+<?php
+
+namespace Drupal\Tests\advancedqueue\Kernel;
+
+use Drupal\advancedqueue\Queue\AdvancedQueueDatabase;
+use Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface;
+use Drupal\advancedqueue\QueueJob;
+use Drupal\advancedqueue\QueueJobStatus;
+use Drupal\advancedqueue\Runner\RunnerHelper;
+use Drupal\advancedqueue\Runner\SerialQueueRunner;
+use Drupal\KernelTests\KernelTestBase;
+
+/**
+ * Tests the advanced queue runner.
+ *
+ * @group advancedqueue
+ */
+class AdvancedQueueRunnerTest extends KernelTestBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public static $modules = ['advancedqueue', 'advancedqueue_test'];
+
+  public function testEmptyQueue() {
+    $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time());
+
+    $runner->run();
+  }
+
+  public function testSetGroupsWithInvalidGroup() {
+    $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time());
+
+    $this->setExpectedException(\Exception::class, 'The following groups are invalid: invalid_group');
+    $runner->setGroups(['invalid_group']);
+  }
+
+  public function testSetQueuesWithInvalidQueue() {
+    $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time());
+
+    $this->setExpectedException(\Exception::class, 'The following queues are invalid: invalid_queue');
+    $runner->setQueues(['invalid_queue']);
+  }
+
+  public function testRunnerHelper() {
+    $runner_helper = new RunnerHelper();
+    
+    /** @var \Drupal\advancedqueue\Queue\AdvancedQueueFactory $queue_factory */
+    $queue_factory = \Drupal::service('advancedqueue.factory');
+    /** @var \Drupal\advancedqueue\AdvancedqueueWorkerManager $worker_manager */
+    $worker_manager = \Drupal::service('plugin.manager.advancedqueue_worker');
+
+    $queue = $queue_factory->get('worker_status');
+    $queue->createJobItem(QueueJob::create(['status' => QueueJobStatus::STATUS_SUCCESS, 'result' => 'my result']));
+    $this->assertEquals(1, $queue->numberOfItems());
+
+    $result = $runner_helper->processNextItem($queue, $worker_manager->createInstance('worker_status'), 30);
+
+    $this->assertEquals(0, $queue->numberOfItems());
+    $this->assertTrue($result->is(QueueJobStatus::STATUS_SUCCESS));
+    $this->assertEquals('my result', $result->getResult());
+
+    $queue->createJobItem(QueueJob::create(['status' => QueueJobStatus::STATUS_RETRY, 'result' => 'please retry']));
+    $this->assertEquals(1, $queue->numberOfItems());
+
+    $result = $runner_helper->processNextItem($queue, $worker_manager->createInstance('worker_status'), 30);
+
+    $this->assertEquals(1, $queue->numberOfItems());
+    $this->assertTrue($result->is(QueueJobStatus::STATUS_RETRY));
+    $this->assertEquals('please retry', $result->getResult());
+  }
+
+  public function testQueueRunner() {
+    $runner = '';
+
+    /** @var \Drupal\advancedqueue\Queue\AdvancedQueueInterface $queue */
+    $queue = new AdvancedQueueDatabase('queue1', \Drupal::database(), \Drupal::time());
+    $data = [];
+    for ($i = 0; $i < 5; $i++) {
+      $data[] = [$this->randomMachineName() => $this->randomMachineName()];
+    }
+
+    /** @var \Drupal\advancedqueue\QueueJob[] $jobs */
+    $jobs[] = $job = QueueJob::create($data[0])->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::create($data[1])->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::create($data[2])->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::create($data[3])->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+
+    /** @var \Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface  $worker */
+    $worker = $this->prophesize(AdvancedQueueWorkerInterface::class);
+    $worker->processItem($jobs[0])->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem($jobs[1])->willReturn(QueueJobStatus::STATUS_FAILED);
+    $worker->processItem($jobs[2])->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[2]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[2]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem($jobs[3])->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[3]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem(/* no more calls? */);
+
+    // Process for 10 seconds max.
+    $runner->processQueue(10);
+  }
+
+  public function testQueueRunnerWithTime() {
+    $runner = '';
+    $time = new TestTime();
+    $time->setCurrentTime(0);
+
+    /** @var \Drupal\advancedqueue\Queue\AdvancedQueueInterface $queue */
+    $queue = '';
+    $data = [];
+    for ($i = 0; $i < 5; $i++) {
+      $data[] = [$this->randomMachineName() => $this->randomMachineName()];
+    }
+
+    /** @var \Drupal\advancedqueue\QueueJob[] $jobs */
+    $jobs[] = $job = QueueJob::createForTime($data[0], 0)->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::createForTime($data[1], 10)->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::createForTime($data[2], 20)->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+    $jobs[] = $job = QueueJob::createForTime($data[3], 30)->setStatus(QueueJobStatus::STATUS_QUEUED);
+    $queue->createJobItem($job);
+
+    /** @var \Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface  $worker */
+    $worker = $this->prophesize(AdvancedQueueWorkerInterface::class);
+    $worker->processItem($jobs[0])->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem($jobs[1])->willReturn(QueueJobStatus::STATUS_FAILED);
+    $worker->processItem($jobs[2])->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[2]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[2]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem($jobs[3])->willReturn(QueueJobStatus::STATUS_RETRY);
+    $worker->processItem($jobs[3]->setStatus(QueueJobStatus::STATUS_RETRY))->willReturn(QueueJobStatus::STATUS_SUCCESS);
+    $worker->processItem(/* no more calls? */);
+
+    $runner->processQueue(9);
+
+    // We queue for 9 seconds, so we can just process 1 item.
+    $this->assertEquals(3, $queue->numberOfItems());
+
+    $time->setCurrentTime(10);
+
+    $runner->processQueue(9);
+
+    // We queue for 9 seconds, so we can just process 1 item.
+    $this->assertEquals(2, $queue->numberOfItems());
+
+    // @todo testing IO is hard
+
+  }
+
+}
diff --git a/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php b/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php
new file mode 100644
index 0000000..93d68d8
--- /dev/null
+++ b/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php
@@ -0,0 +1,27 @@
+<?php
+
+namespace Drupal\Tests\advancedqueue\Kernel;
+
+use Drupal\KernelTests\KernelTestBase;
+
+/**
+ * Tests the advancedqueue worker manager.  
+ *
+ * @group advancedqueue
+ */
+class AdvancedQueueWorkerManagerTest extends KernelTestBase {
+
+  /**
+   * {@inheritdoc}
+   */
+  public static $modules = ['advancedqueue', 'advancedqueue_test'];
+
+  public function testGetDefinitions() {
+    /** @var \Drupal\advancedqueue\AdvancedqueueWorkerManager $manager */
+    $manager = \Drupal::service('plugin.manager.advancedqueue_worker');
+
+    $definitions = $manager->getDefinitions();
+    $this->assertArrayHasKey('worker1', $definitions);
+  }
+
+}
diff --git a/tests/src/Kernel/TestTime.php b/tests/src/Kernel/TestTime.php
new file mode 100644
index 0000000..4c69d23
--- /dev/null
+++ b/tests/src/Kernel/TestTime.php
@@ -0,0 +1,63 @@
+<?php
+
+namespace Drupal\Tests\advancedqueue\Kernel;
+
+use Drupal\Component\Datetime\TimeInterface;
+
+class TestTime implements TimeInterface {
+
+  protected $requestTime;
+
+  protected $currentTime;
+
+  /**
+   * @param mixed $requestTime
+   *
+   * @return $this
+   */
+  public function setRequestTime($requestTime) {
+    $this->requestTime = $requestTime;
+    return $this;
+  }
+
+  /**
+   * @param mixed $currentTime
+   *
+   * @return $this
+   */
+  public function setCurrentTime($currentTime) {
+    $this->currentTime = $currentTime;
+    return $this;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getRequestTime() {
+    return $this->requestTime;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getRequestMicroTime() {
+    return $this->requestTime;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getCurrentTime() {
+    return $this->currentTime;
+    // TODO: Implement getCurrentTime() method.
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function getCurrentMicroTime() {
+    return $this->currentTime;
+  }
+
+
+}
