=== modified file 'modules/system/system.info'
--- modules/system/system.info	2008-10-12 01:23:01 +0000
+++ modules/system/system.info	2009-04-24 23:02:17 +0000
@@ -6,6 +6,7 @@ version = VERSION
 core = 7.x
 files[] = system.module
 files[] = system.admin.inc
+files[] = system.queue.inc
 files[] = image.gd.inc
 files[] = system.install
 required = TRUE

=== modified file 'modules/system/system.install'
--- modules/system/system.install	2009-04-20 02:23:15 +0000
+++ modules/system/system.install	2009-04-25 13:48:49 +0000
@@ -1041,6 +1041,67 @@ function system_schema() {
     'primary key' => array('mlid'),
   );
 
+  $schema['queue'] = array(
+    'description' => 'Stores items in queues.',
+    'fields' => array(
+      'item_id' => array(
+        'type' => 'serial',
+        'unsigned' => TRUE,
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique item ID.',
+      ),
+      'queue_name' => array(
+        'type' => 'varchar',
+        'length' => 255,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'The queue name.',
+      ),
+      'process_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The ID of the dequeuing process.',
+      ),
+      'item' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'size' => 'big',
+        'serialize' => TRUE,
+        'description' => 'The item itself.',
+      ),
+      'expire' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The time the item needs reset.',
+      ),
+      'created' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'When the item needs reset.',
+      ),
+    ),
+    'primary key' => array('item_id'),
+    'indexes' => array(
+      'process_queue' => array('process_id', 'queue_name', 'created'),
+      'process_expire' => array('process_id', 'expire'),
+    ),
+  );
+
+  $schema['queue_process_id'] = array(
+    'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.',
+    'fields' => array(
+      'process_id'  => array(
+        'type' => 'serial',
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.',
+      ),
+    ),
+    'primary key' => array('process_id'),
+  );
+
   $schema['registry'] = array(
     'description' => "Each record is a function, class, or interface name and the file it is in.",
     'fields' => array(
@@ -3244,6 +3305,76 @@ function system_update_7020() {
 }
 
 /**
+ * Add the queue tables.
+ */
+function system_update_7022() {
+  $schema['queue'] = array(
+    'description' => 'Stores items in queues.',
+    'fields' => array(
+      'item_id' => array(
+        'type' => 'serial',
+        'unsigned' => TRUE,
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique item ID.',
+      ),
+      'queue_name' => array(
+        'type' => 'varchar',
+        'length' => 255,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'The queue name.',
+      ),
+      'process_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The ID of the dequeuing process.',
+      ),
+      'item' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'size' => 'big',
+        'serialize' => TRUE,
+        'description' => 'The item itself.',
+      ),
+      'expire' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The time the item needs reset.',
+      ),
+      'created' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'When the item needs reset.',
+      ),
+    ),
+    'primary key' => array('item_id'),
+    'indexes' => array(
+      'process_queue' => array('process_id', 'queue_name', 'created'),
+      'process_expire' => array('process_id', 'expire'),
+    ),
+  );
+
+  $schema['queue_process_id'] = array(
+    'description' => 'Stores queue process IDs, used to auto-incrament the process ID so that a unique process ID is used.',
+    'fields' => array(
+      'process_id'  => array(
+        'type' => 'serial',
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique process ID used to make sure only one consumer gets one item.',
+      ),
+    ),
+    'primary key' => array('process_id'),
+  );
+  db_create_table($ret, 'queue', $schema['queue']);
+  db_create_table($ret, 'queue_process_id', $schema['queue_process_id']);
+
+  return $ret;
+}
+
+/**
  * @} End of "defgroup updates-6.x-to-7.x"
  * The next series of updates should start at 8000.
  */

=== modified file 'modules/system/system.module'
--- modules/system/system.module	2009-04-22 09:45:02 +0000
+++ modules/system/system.module	2009-04-24 23:02:17 +0000
@@ -1577,6 +1577,10 @@ function system_cron() {
   foreach ($cache_tables as $table) {
     cache_clear_all(NULL, $table);
   }
+
+  // Reset expired items in the default queue implementation table. If that's
+  // not used, no problems, the query will will be a no-op.
+  db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->execute();
 }
 
 /**

=== added file 'modules/system/system.queue.inc'
--- modules/system/system.queue.inc	1970-01-01 00:00:00 +0000
+++ modules/system/system.queue.inc	2009-04-25 16:16:00 +0000
@@ -0,0 +1,219 @@
+<?php
+// $Id$
+
+/**
+ * @file
+ * Queue functionality.
+ */
+
+/**
+ * @defgroup queue Queue operations
+ * @{
+ * The queue system allows placing items in a queue and processing it later.
+ * The system makes sure only one process can process an item.
+ *
+ * Before a queue can be used ti needs to be created by
+ * system_queue_create_queue(). Then items can be added to the queue by calling
+ * system_queue_add_item(). To process an item, call
+ * system_queue_reserve_item(). When finished processing, the item needs to be
+ * removed by calling system_queue_delete_item(). If the process dies for
+ * whatever reason, the item becomes available again and another process can
+ * receive it when calling system_queue_reserve_item(). These functions needs
+ * to be called through module_invoke(), for example
+ * module_invoke('system', 'queue_delete_item', $item).
+ *
+ * While the queue system makes a best effort to preserve order in messages,
+ * but due to the pluggable nature of the queue, there is no guarantee that
+ * items will be delivered on reserve in the exact order they were sent.
+ */
+
+interface DrupalQueue {
+  /**
+   * Start working with a queue.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   */
+  function __construct($queue_name);
+
+  /**
+   * Add an item to the queue.
+   *
+   * @param $item
+   *   Arbitrary value to be added to the queue.
+   * @return
+   *   True if item was successfully added to the queue, otherwise false.
+   */
+  function addItem($item);
+
+  /**
+   * Reserve an item in the queue for processing.
+   *
+   * @param $process_time
+   *   How long the processing is expected to take in seconds, Defaults to
+   *   an hour.
+   * @return
+   *   A queue entry which is an object. The 'item' property contains $item
+   *   which is what was passed to addItem().
+   */
+  function reserveItem($process_time = 3600);
+
+  /**
+   * Remove a finished item from the queue.
+   *
+   * @param $item_id
+   *  The item_id coming from the entry reserveItem() returned.
+   */
+  function deleteItem($item_id);
+
+  /**
+   * Create a queue.
+   */
+  function createQueue();
+
+  /**
+   * Remove a queue.
+   */
+  function removeQueue();
+}
+
+/**
+ * Get a queue object for a given queue_name.
+ *
+ * @param $queue_name
+ *   Arbitrary string. The name of the queue to work with.
+ * @return
+ *   The queue object for a given queue_name.
+ */
+function _system_get_queue($queue_name) {
+  static $queues;
+  if (!isset($queues[$queue_name])) {
+    $class = variable_get('queue_module_'. $queue_name, 'system') . 'Queue';
+    $queues[$queue_name] = new $class($queue_name);
+  }
+  return $queues[$queue_name];
+}
+
+/**
+ * Add an item to a queue.
+ *
+ * @param $queue_name
+ *   Arbitrary string. The name of the queue to work with.
+ * @param $item
+ *   Arbitrary value to be queued.
+ * @return
+ *   True if item was successfully added to the queue, otherwise false.
+ */
+function system_queue_add_item($queue_name, $item) {
+  return _system_get_queue($queue_name)->addItem($item);
+}
+
+/**
+ * Reserve an item in a queue.
+ *
+ * @param $queue_name
+ *   Arbitrary string. The name of the queue to work with.
+ * @param $process_time
+ *   How long the processing is expected to take in seconds, Defaults to an
+ *   hour. After this time passes, the item will be reset and another process
+ *   can reserve the item.
+ * @return
+ *   A queue entry which is an object. The 'item' property contains the $item
+ *   which is what was passed to system_queue_add_item(). This queue entry
+ *   needs to be passed to system_queue_delete_item() once processing is
+ *   completed.
+ */
+function system_queue_reserve_item($queue_name, $process_time = 3600) {
+  return _system_get_queue($queue_name)->reserveItem($process_time);
+}
+
+/**
+ * Delete a finished entry from the queue.
+ *
+ * @param $entry
+ *  The entry object returned by system_queue_reserve_item().
+ */
+function system_queue_delete_item($entry) {
+  _system_get_queue($entry->queue_name)->deleteItem($entry->item_id);
+}
+
+/**
+ * Create a queue.
+ *
+ * @param $queue_name
+ *   Arbitrary string. The name of the queue to work with.
+ */
+function system_queue_create_queue($queue_name) {
+  _system_get_queue($queue_name)->createQueue();
+}
+
+/**
+ * Remove a queue and every item in the queue.
+ *
+ * @param $queue_name
+ *   Arbitrary string. The name of the queue to work with.
+ */
+function system_queue_remove_queue($queue_name) {
+  _system_get_queue($queue_name)->removeQueue();
+}
+
+/**
+ * Default queue implementation.
+ *
+ * Do not use this class directly, use the system_queue_add_item(),
+ * system_queue_reserve_item(), system_queue_delete_item() and
+ * system_queue_remove_queue() functions.
+ */
+class systemQueue implements DrupalQueue {
+  protected $process_id;
+
+  protected $queue_name;
+
+  function __construct($queue_name) {
+    $this->queue_name = $queue_name;
+  }
+
+  function addItem($item) {
+    $record->queue_name = $this->queue_name;
+    $record->item = $item;
+    $record->process_id = 0;
+    return drupal_write_record('queue', $record) !== FALSE;
+  }
+
+  function reserveItem($process_time = 30) {
+    if (!isset($this->process_id)) {
+      $this->process_id = db_insert('queue_process_id')->useDefaults(array('process_id'))->execute();
+    }
+    $start = time();
+    $entry = FALSE;
+    $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE process_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject();
+    if ($entry) {
+      // Try to mark the item as ours.
+      $update = db_update('queue')
+        ->fields(array('process_id' => $this->process_id, 'expire' => time() + $process_time))
+        ->condition('item_id', $entry->item_id)
+        ->condition('process_id', 0);
+      // If there are affected rows, this update succeeded.
+      if ($update->execute()) {
+        $entry->item = unserialize($entry->item);
+        return $entry;
+      }
+    }
+  }
+
+  function deleteItem($item_id) {
+    db_delete('queue')->condition('item_id', $item_id);
+  }
+
+  function createQueue() {
+    // Our queues are created automatically on demand.
+  }
+
+  function removeQueue() {
+    db_delete('queue')->condition('queue_name', $this->queue_name)->execute();
+  }
+}
+
+/**
+ * @} End of "defgroup queue".
+ */

=== modified file 'modules/system/system.test'
--- modules/system/system.test	2009-03-31 01:49:50 +0000
+++ modules/system/system.test	2009-04-25 00:11:30 +0000
@@ -878,7 +878,7 @@ class SystemThemeFunctionalTest extends 
       'node_admin_theme' => FALSE,
     );
     $this->drupalPost('admin/build/themes', $edit, t('Save configuration'));
-    
+
     $this->drupalGet('admin');
     $this->assertRaw('themes/garland', t('Administration theme used on an administration page.'));
 
@@ -887,7 +887,7 @@ class SystemThemeFunctionalTest extends 
 
     // Reset to the default theme settings.
     $this->drupalPost('admin/build/themes', array(), t('Reset to defaults'));
-    
+
     $this->drupalGet('admin');
     $this->assertRaw('themes/garland', t('Site default theme used on administration page.'));
 
@@ -895,3 +895,73 @@ class SystemThemeFunctionalTest extends 
     $this->assertRaw('themes/garland', t('Site default theme used on the add content page.'));
   }
 }
+
+
+class QueueTestCase extends DrupalWebTestCase {
+  function getInfo() {
+    return array(
+      'name' => t('Queue functionality'),
+      'description' => t('Queues and dequeues an item.'),
+      'group' => t('System'),
+    );
+  }
+
+  function testQueue() {
+    // Create two queues for testing.
+    $queue1 = $this->randomName();
+    $queue2 = $this->randomName();
+    module_invoke('system', 'queue_create_queue', $queue1);
+    module_invoke('system', 'queue_create_queue', $queue2);
+    // Add four items.
+    $items = array();
+    for ($i = 0; $i < 4; $i++) {
+      $items[] = array($this->randomName() => $this->randomName());
+    }
+    module_invoke('system', 'queue_add_item', $queue1, $items[0]);
+    module_invoke('system', 'queue_add_item', $queue1, $items[1]);
+    // Retrieve two items from one queue.
+    $entry = module_invoke('system', 'queue_reserve_item', $queue1);
+    $entries[] = $entry;
+    $new_items[] = $entry->item;
+    $entry = module_invoke('system', 'queue_reserve_item', $queue1);
+    $entries[] = $entry;
+    $new_items[] = $entry->item;
+    // Two dequeued items should match the two items we queued.
+    $this->assertEqual($this->queueScore($items, $new_items), 2, t('Two items matched'));
+    // Add two more items.
+    module_invoke('system', 'queue_add_item', $queue1, $items[2]);
+    module_invoke('system', 'queue_add_item', $queue1, $items[3]);
+    $entry = module_invoke('system', 'queue_reserve_item', $queue1);
+    $entries[] = $entry;
+    $new_items[] = $entry->item;
+    $entry = module_invoke('system', 'queue_reserve_item', $queue1);
+    $entries[] = $entry;
+    $new_items[] = $entry->item;
+    // All dequeued items should match the items we queued exactly once,
+    // therefore the score must be exactly 4.
+    $this->assertEqual($this->queueScore($items, $new_items), 4, t('Four items matched'));
+    // There should not be equal dequeued items. Each item is equal to itself
+    // therefore the score must be exactly 4.
+    $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
+    foreach ($entries as $entry) {
+      module_invoke('system', 'queue_delete_item', $entry);
+    }
+    $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue1), t('Queue 1 is empty'));
+    $this->assertFalse(module_invoke('system', 'queue_reserve_item', $queue2), t('Queue 2 is empty'));
+  }
+
+  /**
+   * This function returns the number of equal items in two arrays.
+   */
+  function queueScore($items, $new_items) {
+    $score = 0;
+    foreach ($items as $item) {
+      foreach ($new_items as $new_item) {
+        if ($item === $new_item) {
+          $score++;
+        }
+      }
+    }
+    return $score;
+  }
+}

