cvs diff: Diffing modules/system
Index: modules/system/system.info
===================================================================
RCS file: /cvs/drupal/drupal/modules/system/system.info,v
retrieving revision 1.11
diff -u -p -r1.11 system.info
--- modules/system/system.info	12 Oct 2008 01:23:06 -0000	1.11
+++ modules/system/system.info	26 Apr 2009 21:21:58 -0000
@@ -6,6 +6,7 @@ version = VERSION
 core = 7.x
 files[] = system.module
 files[] = system.admin.inc
+files[] = system.queue.inc
 files[] = image.gd.inc
 files[] = system.install
 required = TRUE
Index: modules/system/system.install
===================================================================
RCS file: /cvs/drupal/drupal/modules/system/system.install,v
retrieving revision 1.317
diff -u -p -r1.317 system.install
--- modules/system/system.install	26 Apr 2009 14:57:36 -0000	1.317
+++ modules/system/system.install	26 Apr 2009 21:22:01 -0000
@@ -1052,6 +1052,67 @@ function system_schema() {
     'primary key' => array('mlid'),
   );
 
+  $schema['queue'] = array(
+    'description' => 'Stores items in queues.',
+    'fields' => array(
+      'item_id' => array(
+        'type' => 'serial',
+        'unsigned' => TRUE,
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique item ID.',
+      ),
+      'queue_name' => array(
+        'type' => 'varchar',
+        'length' => 255,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'The queue name.',
+      ),
+      'consumer_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The ID of the dequeuing consumer.',
+      ),
+      'item' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'size' => 'big',
+        'serialize' => TRUE,
+        'description' => 'The item itself.',
+      ),
+      'expire' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The time the item needs reset.',
+      ),
+      'created' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'When the item needs reset.',
+      ),
+    ),
+    'primary key' => array('item_id'),
+    'indexes' => array(
+      'consumer_queue' => array('consumer_id', 'queue_name', 'created'),
+      'consumer_expire' => array('consumer_id', 'expire'),
+    ),
+  );
+
+  $schema['queue_consumer_id'] = array(
+    'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.',
+    'fields' => array(
+      'consumer_id'  => array(
+        'type' => 'serial',
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
+      ),
+    ),
+    'primary key' => array('consumer_id'),
+  );
+
   $schema['registry'] = array(
     'description' => "Each record is a function, class, or interface name and the file it is in.",
     'fields' => array(
@@ -3255,6 +3316,76 @@ function system_update_7020() {
 }
 
 /**
+ * Add the queue tables.
+ */
+function system_update_7022() {
+  $schema['queue'] = array(
+    'description' => 'Stores items in queues.',
+    'fields' => array(
+      'item_id' => array(
+        'type' => 'serial',
+        'unsigned' => TRUE,
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique item ID.',
+      ),
+      'queue_name' => array(
+        'type' => 'varchar',
+        'length' => 255,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'The queue name.',
+      ),
+      'consumer_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The ID of the dequeuing consumer.',
+      ),
+      'item' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'size' => 'big',
+        'serialize' => TRUE,
+        'description' => 'The item itself.',
+      ),
+      'expire' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'The time the item needs reset.',
+      ),
+      'created' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'description' => 'When the item needs reset.',
+      ),
+    ),
+    'primary key' => array('item_id'),
+    'indexes' => array(
+      'consumer_queue' => array('consumer_id', 'queue_name', 'created'),
+      'consumer_expire' => array('consumer_id', 'expire'),
+    ),
+  );
+
+  $schema['queue_consumer_id'] = array(
+    'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.',
+    'fields' => array(
+      'consumer_id'  => array(
+        'type' => 'serial',
+        'not null' => TRUE,
+        'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
+      ),
+    ),
+    'primary key' => array('consumer_id'),
+  );
+  db_create_table($ret, 'queue', $schema['queue']);
+  db_create_table($ret, 'queue_consumer_id', $schema['queue_consumer_id']);
+
+  return $ret;
+}
+
+/**
  * @} End of "defgroup updates-6.x-to-7.x"
  * The next series of updates should start at 8000.
  */
Index: modules/system/system.module
===================================================================
RCS file: /cvs/drupal/drupal/modules/system/system.module,v
retrieving revision 1.687
diff -u -p -r1.687 system.module
--- modules/system/system.module	26 Apr 2009 19:44:39 -0000	1.687
+++ modules/system/system.module	26 Apr 2009 21:22:03 -0000
@@ -1580,6 +1580,10 @@ function system_cron() {
   foreach ($cache_tables as $table) {
     cache_clear_all(NULL, $table);
   }
+
+  // Reset expired items in the default queue implementation table. If that's
+  // not used, this will simply be a no-op.
+  db_update('queue')->fields(array('consumer_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->execute();
 }
 
 /**
Index: modules/system/system.queue.inc
===================================================================
RCS file: modules/system/system.queue.inc
diff -N modules/system/system.queue.inc
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ modules/system/system.queue.inc	26 Apr 2009 21:22:03 -0000
@@ -0,0 +1,256 @@
+<?php
+// $Id$
+
+/**
+ * @file
+ * Queue functionality.
+ */
+
+/**
+ * @defgroup queue Queue operations
+ * @{
+ * The queue system allows placing items in a queue and processing them later.
+ * The system tries to ensure that only one consumer can process an item.
+ *
+
+ * Before a queue can be used it needs to be created by
+ * Queue::createQueue(). Then items can be added to the queue by calling
+ * Queue::addItem(). To process an item, call Queue::claimItem() and specify
+ * how long you want to have a lease for working on that item. When finished
+ * processing, the item needs to be removed by calling Queue::deleteItem(). If
+ * the consumer dies, the item becomes available again once the lease expires,
+ * and another consumer can receive it when calling Queue::claimItem().
+ *
+ * While the queue system makes a best effort to preserve order in messages,
+ * due to the pluggable nature of the queue, there is no guarantee that items
+ * will be delivered on claim in the order they were sent.
+ *
+ * The system makes no guarantees about a task only being executed once:
+ * callers that have non-idempotent tasks either need to live with the
+ * possiblity of the task being invoked multiple times in cases where a claim
+ * lease expires, or need to implement their own transactions to make their
+ * tasks idempotent.
+ */
+
+class Queue {
+  /**
+   * Get a queue object for a given queue_name.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   * @return
+   *   The queue object for a given queue_name.
+   */
+  protected function getQueue($queue_name) {
+    //
+    static $queues;
+    if (!isset($queues[$queue_name])) {
+      $class = variable_get('queue_module_'. $queue_name, 'system') . 'Queue';
+      $queues[$queue_name] = new $class($queue_name);
+    }
+    return $queues[$queue_name];
+  }
+
+  /**
+   * Add an item to a queue.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   * @param $item
+   *   Arbitrary value to be queued.
+   * @return
+   *   True if item was successfully added to the queue, otherwise false.
+   */
+  static function addItem($queue_name, $item) {
+    return Queue::getQueue($queue_name)->addItem($item);
+  }
+
+  /**
+   * Claim an item in a queue for processing.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   * @param $lease_time
+   *   How long the processing is expected to take in seconds, defaults to an
+   *   hour. After this lease expires, the item will be reset and another
+   *   consumer can claim the item. For idempotent tasks (which can be run
+   *   multiple times without side effects), shorter lease times would result
+   *   in lower latency in case a consumer fails. For tasks that should not be
+   *   run more than once (non-idempotent), a larger lease time will make it
+   *   more rare for a given task to run multiple times in cases of failure,
+   *   at the cost of higher latency.
+   * @return
+   *   A queue entry which is an object. The 'item' property contains the $item
+   *   which is what was passed to Queue::addItem(). This queue entry
+   *   needs to be passed to Queue::deleteItem() once processing is
+   *   completed.
+   */
+  static function claimItem($queue_name, $lease_time = 3600) {
+    return Queue::getQueue($queue_name)->claimItem($lease_time);
+  }
+
+  /**
+   * Delete a finished entry from the queue.
+   *
+   * @param $entry
+   *   The entry object returned by Queue::claimItem().
+   */
+  static function deleteItem($entry) {
+    Queue::getQueue($entry->queue_name)->deleteItem($entry->item_id);
+  }
+
+  /**
+   * Create a queue.
+   *
+   * Called during installation and should be used to perform any necessary
+   * initialization operations. This should not be confused with the
+   * constructor for these objects, which is called every time an object is
+   * instantiated to operate on a queue. This operation is only needed the
+   * first time a given queue is going to be initialized (for example, to make
+   * a new database table or directory to hold tasks for the queue -- it
+   * depends on the queue implementation if this is necessary at all).
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   */
+  static function createQueue($queue_name) {
+    Queue::getQueue($queue_name)->createQueue();
+  }
+
+  /**
+   * Remove a queue and every item in the queue.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   */
+  static function removeQueue($queue_name) {
+    Queue::getQueue($queue_name)->removeQueue();
+  }
+}
+
+interface DrupalQueueInterface {
+  /**
+   * Start working with a queue.
+   *
+   * @param $queue_name
+   *   Arbitrary string. The name of the queue to work with.
+   */
+  function __construct($queue_name);
+
+  /**
+   * Add an item to the queue.
+   *
+   * @param $item
+   *   Arbitrary value to be added to the queue.
+   * @return
+   *   True if item was successfully added to the queue, otherwise false.
+   */
+  function addItem($item);
+
+  /**
+   * Claim an item in the queue for processing.
+   *
+   * @param $lease_time
+   *   How long the processing is expected to take in seconds, defaults to an
+   *   hour. After this lease expires, the item will be reset and another
+   *   consumer can claim the item. For idempotent tasks (which can be run
+   *   multiple times without side effects), shorter lease times would result
+   *   in lower latency in case a consumer fails. For tasks that should not be
+   *   run more than once (non-idempotent), a larger lease time will make it
+   *   more rare for a given task to run multiple times in cases of failure,
+   *   at the cost of higher latency.
+   * @return
+   *   A queue entry which is an object. The 'item' property contains $item
+   *   which is what was passed to DrupalQueueInterface::addItem(). This queue
+   *   entry needs to be passed to DrupalQueueInterface::deleteItem() once
+   *   processing is completed.
+   */
+  function claimItem($lease_time = 3600);
+
+  /**
+   * Remove a finished item from the queue.
+   *
+   * @param $item_id
+   *   The item_id coming from the entry claimItem() returned.
+   */
+  function deleteItem($item_id);
+
+  /**
+   * Create a queue.
+   *
+   * Called during installation and should be used to perform any necessary
+   * initialization operations. This should not be confused with the
+   * constructor for these objects, which is called every time an object is
+   * instantiated to operate on a queue. This operation is only needed the
+   * first time a given queue is going to be initialized (for example, to make
+   * a new database table or directory to hold tasks for the queue -- it
+   * depends on the queue implementation if this is necessary at all).
+   */
+  function createQueue();
+
+  /**
+   * Remove a queue and every item in the queue.
+   */
+  function removeQueue();
+}
+
+/**
+ * Default queue implementation.
+ *
+ * Do not use this class directly, use the Queue::addItem(),
+ * Queue::claimItem(), Queue::deleteItem() and
+ * Queue::removeQueue() functions.
+ */
+class systemQueue implements DrupalQueueInterface {
+  protected $consumer_id;
+
+  protected $queue_name;
+
+  function __construct($queue_name) {
+    $this->queue_name = $queue_name;
+  }
+
+  function addItem($item) {
+    $record->queue_name = $this->queue_name;
+    $record->item = $item;
+    $record->consumer_id = 0;
+    return drupal_write_record('queue', $record) !== FALSE;
+  }
+
+  function claimItem($lease_time = 30) {
+    if (!isset($this->consumer_id)) {
+      $this->consumer_id = db_insert('queue_consumer_id')->useDefaults(array('consumer_id'))->execute();
+    }
+    $start = time();
+    $entry = FALSE;
+    $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE consumer_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject();
+    if ($entry) {
+      // Try to mark the item as ours.
+      $update = db_update('queue')
+        ->fields(array('consumer_id' => $this->consumer_id, 'expire' => time() + $lease_time))
+        ->condition('item_id', $entry->item_id)
+        ->condition('consumer_id', 0);
+      // If there are affected rows, this update succeeded.
+      if ($update->execute()) {
+        $entry->item = unserialize($entry->item);
+        return $entry;
+      }
+    }
+  }
+
+  function deleteItem($item_id) {
+    db_delete('queue')->condition('item_id', $item_id);
+  }
+
+  function createQueue() {
+    // Our queues are created automatically on demand.
+  }
+
+  function removeQueue() {
+    db_delete('queue')->condition('queue_name', $this->queue_name)->execute();
+  }
+}
+
+/**
+ * @} End of "defgroup queue".
+ */
Index: modules/system/system.test
===================================================================
RCS file: /cvs/drupal/drupal/modules/system/system.test,v
retrieving revision 1.40
diff -u -p -r1.40 system.test
--- modules/system/system.test	31 Mar 2009 01:49:54 -0000	1.40
+++ modules/system/system.test	26 Apr 2009 21:22:03 -0000
@@ -878,7 +878,7 @@ class SystemThemeFunctionalTest extends 
       'node_admin_theme' => FALSE,
     );
     $this->drupalPost('admin/build/themes', $edit, t('Save configuration'));
-    
+
     $this->drupalGet('admin');
     $this->assertRaw('themes/garland', t('Administration theme used on an administration page.'));
 
@@ -887,7 +887,7 @@ class SystemThemeFunctionalTest extends 
 
     // Reset to the default theme settings.
     $this->drupalPost('admin/build/themes', array(), t('Reset to defaults'));
-    
+
     $this->drupalGet('admin');
     $this->assertRaw('themes/garland', t('Site default theme used on administration page.'));
 
@@ -895,3 +895,90 @@ class SystemThemeFunctionalTest extends 
     $this->assertRaw('themes/garland', t('Site default theme used on the add content page.'));
   }
 }
+
+
+/**
+ * Test the basic queue functionality.
+ */
+class QueueTestCase extends DrupalWebTestCase {
+  function getInfo() {
+    return array(
+      'name' => t('Queue functionality'),
+      'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'),
+      'group' => t('System'),
+    );
+  }
+
+  /**
+   * Queues and dequeues a set of items to check the basic queue functionality.
+   */
+  function testQueue() {
+    // Create two queues.
+    Queue::createQueue($queue1 = $this->randomName());
+    Queue::createQueue($queue2 = $this->randomName());
+
+    // Create four items.
+    $items = array();
+    for ($i = 0; $i < 4; $i++) {
+      $items[] = array($this->randomName() => $this->randomName());
+    }
+
+    // Queue items 1 and 2 in the queue1.
+    Queue::addItem($queue1, $items[0]);
+    Queue::addItem($queue1, $items[1]);
+
+    // Retrieve two items from queue1.
+    $entries = array();
+    $new_items = array();
+
+    $entries[] = $entry = Queue::claimItem($queue1);
+    $new_items[] = $entry->item;
+
+    $entries[] = $entry = Queue::claimItem($queue1);
+    $new_items[] = $entry->item;
+
+    // First two dequeued items should match the first two items we queued.
+    $this->assertEqual($this->queueScore($items, $new_items), 2, t('Two items matched'));
+
+    // Add two more items.
+    Queue::addItem($queue1, $items[2]);
+    Queue::addItem($queue1, $items[3]);
+
+    $entries[] = $entry = Queue::claimItem($queue1);
+    $new_items[] = $entry->item;
+
+    $entries[] = $entry = Queue::claimItem($queue1);
+    $new_items[] = $entry->item;
+
+    // All dequeued items should match the items we queued exactly once,
+    // therefore the score must be exactly 4.
+    $this->assertEqual($this->queueScore($items, $new_items), 4, t('Four items matched'));
+
+    // There should be no duplicate items.
+    $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
+
+    // Delete all items from queue1.
+    foreach ($entries as $entry) {
+      Queue::deleteItem($entry);
+    }
+
+    // Check that both queues are empty.
+    $this->assertFalse(Queue::claimItem($queue1), t('Queue 1 is empty'));
+    $this->assertFalse(Queue::claimItem($queue2), t('Queue 2 is empty'));
+  }
+
+  /**
+   * This function returns the number of equal items in two arrays.
+   */
+  function queueScore($items, $new_items) {
+    $score = 0;
+    foreach ($items as $item) {
+      foreach ($new_items as $new_item) {
+        if ($item === $new_item) {
+          $score++;
+        }
+      }
+    }
+    return $score;
+  }
+}
