? 922702-1_queue_integration.patch
? 922702-2_queue_integration.patch
? 922702-3_queue_integration.patch
? 922702-4_queue_integration.patch
? 922702-5_queue_integration.patch
? 922702-6_queue_integration.patch
? 922702-7_queue_integration.patch
Index: JobScheduler.inc
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/JobScheduler.inc,v
retrieving revision 1.9
diff -u -p -r1.9 JobScheduler.inc
--- JobScheduler.inc	25 Sep 2010 18:37:45 -0000	1.9
+++ JobScheduler.inc	26 Sep 2010 00:28:50 -0000
@@ -7,41 +7,73 @@
  */
 
 /**
- * Handle adding and removing jobs from schedule.
+ * Use to make Job Scheduler exceptions identifiable by type.
+ */
+class JobSchedulerException extends Exception {}
+
+/**
+ * Manage scheduled jobs.
  */
 class JobScheduler {
   /**
-   * Create a single instance of JobScheduler.
+   * The name of this scheduler.
+   */
+  protected $name;
+
+  /**
+   * Produces a single instance of JobScheduler for a schedule name.
    */
-  public static function instance() {
-    static $instance;
-    if (!isset($instance)) {
-      $class = variable_get('job_scheduler_class', 'JobScheduler');
-      $instance = new $class();
+  public static function get($name) {
+    static $schedulers;
+    // Instantiante a new scheduler for $name if we haven't done so yet.
+    if (!isset($schedulers[$name])) {
+      $class = variable_get('job_scheduler_class_'. $name, 'JobScheduler');
+      $schedulers[$name] = new $class($name);
     }
-    return $instance;
+    return $schedulers[$name];
   }
 
   /**
-   * Protect constructor.
+   * Creates a JobScheduler object.
    */
-  protected function __construct() {}
+  protected function __construct($name) {
+    $this->name = $name;
+  }
+
+  /**
+   * Returns scheduler info.
+   *
+   * @see hook_cron_job_scheduler_info().
+   *
+   * @throws JobSchedulerException.
+   */
+  public function info() {
+    // Collect info for all schedules once.
+    static $info;
+    if (!$info) {
+      $info = module_invoke_all('cron_job_scheduler_info');
+      drupal_alter('cron_job_scheduler_info', $info);
+    }
+    if (isset($info[$this->name])) {
+      return $info[$this->name];
+    }
+    throw new JobSchedulerException('Could not find Job Scheduler cron information for '. check_plain($this->name));
+  }
 
   /**
    * Add a job to the schedule, replace any existing job.
    *
-   * A job is uniquely identified by $job = array(callback, type, id).
+   * A job is uniquely identified by $job = array(type, id).
    *
    * @param $job
    *   An array that must contain the following keys:
-   *   'callback' - The callback to evoke.
    *   'type'     - A string identifier of the type of job.
    *   'id'       - A numeric identifier of the job.
    *   'period'   - The time when the task should be executed.
    *   'periodic' - True if the task should be repeated periodically.
    *
    * @code
-   * function callback($job) {
+   * function worker_callback($job) {
    *   // Work off job.
    *   // Set next time to be called. If this portion of the code is not
    *   // reached for some reason, the scheduler will keep periodically invoking
@@ -51,6 +83,7 @@ class JobScheduler {
    * @endcode
    */
   public function set($job) {
+    $job['name'] = $this->name;
     $job['last'] = REQUEST_TIME;
     $job['next'] = REQUEST_TIME + $job['period'];
     $job['scheduled'] = 0;
@@ -59,92 +92,63 @@ class JobScheduler {
   }
 
   /**
+   * Reserve a job.
+   */
+  protected function reserve($job) {
+    $job['name'] = $this->name;
+    $job['scheduled'] =
+    $job['last'] = REQUEST_TIME;
+    $job['next'] = $job['period'] + REQUEST_TIME;
+    drupal_write_record('job_schedule', $job, array('name', 'type', 'id'));
+  }
+
+  /**
    * Remove a job from the schedule, replace any existing job.
    *
-   * A job is uniquely identified by $job = array(callback, type, id).
+   * A job is uniquely identified by $job = array(type, id).
    */
   public function remove($job) {
     db_delete('job_schedule')
-      ->condition('callback', $job['callback'])
+      ->condition('name', $this->name)
       ->condition('type', $job['type'])
       ->condition('id', isset($job['id']) ? $job['id'] : 0)
       ->execute();
   }
 
   /**
-   * Remove all jobs for a given callback and type.
+   * Remove all jobs for a given type.
    */
-  public function removeAll($callback, $type) {
+  public function removeAll($type) {
     db_delete('job_schedule')
-      ->condition('callback', $callback)
+      ->condition('name', $this->name)
       ->condition('type', $type)
       ->execute();
   }
 
   /**
-   * Periodic cron task.
+   * Dispatches a job.
+   *
+   * Executes a worker callback or if schedule declares a queue name, queues a
+   * job for execution.
+   *
+   * @param $job
+   *   A $job array as passed into set() or read from job_schedule table.
    *
-   * Dispatches jobs until a limit of 200 jobs or 30 seconds processing time is
-   * reached.
+   * @throws Exception
+   *   Exceptions thrown by code called by this method are passed on.
    */
-  public function cron() {return;
-    // Check and set scheduler semaphore, take time.
-    if (variable_get('job_scheduler_cron', FALSE)) {
-      watchdog('JobScheduler', 'Last cron process did not finish.', array(), WATCHDOG_ERROR);
-    }
-    variable_set('job_scheduler_cron', TRUE);
-    $start = time();
-
-    // Reschedule stuck periodic jobs after one hour.
-    db_update('job_schedule')
-    ->fields(array(
-      'scheduled' => 0,
-    ))
-    ->condition('scheduled', REQUEST_TIME - 3600, '<')
-    ->condition('periodic', 1)
-    ->execute();
-
-    // Query and dispatch scheduled jobs.
-    $start = time();
-    $jobs = db_select('job_schedule')
-      ->fields('job_schedule', array('*'))
-      ->condition('scheduled', 0)
-      ->condition('next', REQUEST_TIME, '<')
-      ->orderBy('next', 'ASC')
-      ->range(0, 200)
-      ->execute();
-    foreach ($jobs as $job) {
-      // Flag periodic jobs as scheduled, remove one-off jobs.
-      if ($job['periodic']) {
-        $job['scheduled'] = $job['last'] = REQUEST_TIME;
-        $job['next'] = $job['period'] + REQUEST_TIME;
-        drupal_write_record('job_schedule', $job, array('callback', 'type', 'id'));
-      }
-      else {
-        $this->remove($job);
-      }
-      // Queue job if there is a queue declared for it, otherwise execute it.
-      if (function_exists($job['callback'])) {
-        if (!$this->queue($job)) {
-          $job['callback']($job);
-        }
-      }
-      if (time() > ($start + 30)) {
-        break;
+  public function dispatch($job) {
+    $info = $this->info();
+    if (!$job['periodic']) {
+      $this->remove($job);
+    }
+    if ($info['queue name']) {
+      if (DrupalQueue::get($info['queue name'])->createItem($job)) {
+        $this->reserve($job);
       }
     }
-
-    // Unflag and post a message that we're done.
-    variable_set('job_scheduler_cron', FALSE);
-    watchdog('JobScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start)));
-  }
-
-  /**
-   * Add a job to a queue.
-   *
-   * @todo: Implement.
-   */
-  protected function queue($job) {
-    return FALSE;
+    if (function_exists($info['worker callback'])) {
+      $info['worker callback']($job);
+    }
   }
 }
Index: README.txt
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/README.txt,v
retrieving revision 1.5
diff -u -p -r1.5 README.txt
--- README.txt	16 Sep 2010 22:14:36 -0000	1.5
+++ README.txt	26 Sep 2010 00:28:50 -0000
@@ -10,38 +10,82 @@ a fixed interval.
 Usage
 =====
 
+Declare scheduler.
+
+  function example_cron_job_scheduler_info() {
+    $schedulers = array();
+    $schedulers['example_unpublish'] = array(
+      'worker callback' => 'example_unpublish_nodes',
+    );
+    return $schedulers;
+  }
+
 Add a job.
 
   $job = array(
-    'callback' => 'mymodule_unpublish_nodes',
     'type' => 'story',
     'id' => 12,
     'period' => 3600,
     'periodic' => TRUE,
   );
-  job_scheduler()->set($job);
+  JobScheduler::get('example_unpublish')->set($job);
 
-Work off a job
+Work off a job.
 
-  function mymodule_unpublish_nodes($job) {
+  function example_unpublish_nodes($job) {
     // Do stuff.
   }
 
 Remove a job.
 
   $job = array(
-    'callback' => 'mymodule_unpublish_nodes',
     'type' => 'story',
     'id' => 12,
   );
-  job_scheduler()->remove($job);
+  JobScheduler::get('example_unpublish')->remove($job);
 
 
 Drupal Queue integration
 ========================
 
-Not supported in Drupal 7 at the moment.
+Optionally, at the scheduled time Job Scheduler can queue a job for execution,
+rather than executing the job directly. This is useful when many jobs need to
+be executed or when the job's expected execution time is very long.
+
+More information on Drupal Queue: http://api.drupal.org/api/group/queue/7
+
+Instead of declaring a worker callback, declare a queue.
+
+  function example_cron_job_scheduler_info() {
+    $schedulers = array();
+    $schedulers['example_unpublish'] = array(
+      'queue name' => 'example_unpublish_queue',
+    );
+    return $schedulers;
+  }
+
+This of course assumes that you have declared a queue. Notice how in this
+pattern the queue callback contains the actual worker callback.
 
+  function example_cron_queue_info() {
+    $schedulers = array();
+    $schedulers['example_unpublish_queue'] = array(
+      'worker callback' => 'example_unpublish_nodes',
+    );
+    return $schedulers;
+  }
+
+
+Work off a job: when using a queue, Job Scheduler reserves a job for one hour
+giving the queue time to work off a job before it reschedules it. This means
+that the worker callback needs to reset the job's schedule flag in order to
+allow renewed scheduling.
+
+  function example_unpublish_nodes($job) {
+    // Do stuff.
+    // Set the job again so that its reserved flag is reset.
+    JobScheduler::get('example_unpublish')->set($job);
+  }
 
 Example
 =======
@@ -55,6 +99,6 @@ Hidden settings
 Hidden settings are variables that you can define by adding them to the $conf
 array in your settings.php file.
 
-Name:        job_scheduler_class
+Name:        'job_scheduler_class_' . $name
 Default:     'JobScheduler'
-Description: The class to use for managing schedule.
+Description: The class to use for managing a particular schedule.
Index: job_scheduler.api.php
===================================================================
RCS file: job_scheduler.api.php
diff -N job_scheduler.api.php
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ job_scheduler.api.php	26 Sep 2010 00:28:50 -0000
@@ -0,0 +1,51 @@
+<?php
+// $Id$
+
+/**
+ * @file
+ * API documentation for hooks.
+ */
+
+/**
+ * Declare job scheduling holding items that need to be run periodically.
+ *
+ * @return
+ *   An associative array where the key is the queue name and the value is
+ *   again an associative array. Possible keys are:
+ *   - 'worker callback': The name of the function to call. It will be called
+ *     at schedule time.
+ *   - 'queue name': The name of the queue to use to queue this task. Must
+ *     contain a valid queue name, declared by hook_cron_queue_info().
+ *   If queue name is given, worker callback will be ignored.
+ *
+ * @see hook_cron_job_scheduler_info_alter()
+ * @see hook_cron_queue_info()
+ * @see hook_cron_queue_info_alter()
+ */
+function hook_cron_job_scheduler_info() {
+  $info = array();
+  $info['example_reset'] = array(
+    'worker callback' => 'example_cache_clear_worker',
+  );
+  $info['example_import'] = array(
+    'queue name' => 'example_import_queue',
+  );
+  return $info;
+}
+
+/**
+ * Alter cron queue information before cron runs.
+ *
+ * Called by drupal_cron_run() to allow modules to alter cron queue settings
+ * before any jobs are processesed.
+ *
+ * @param array $info
+ *   An array of cron schedule information.
+ *
+ * @see hook_cron_queue_info()
+ * @see drupal_cron_run()
+ */
+function hook_cron_job_scheduler_info_alter(&$info) {
+  // Replace the default callback 'example_cache_clear_worker'
+  $info['example_reset']['worker callback'] = 'my_custom_reset';
+}
Index: job_scheduler.install
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/job_scheduler.install,v
retrieving revision 1.5
diff -u -p -r1.5 job_scheduler.install
--- job_scheduler.install	16 Sep 2010 20:27:15 -0000	1.5
+++ job_scheduler.install	26 Sep 2010 00:28:50 -0000
@@ -14,12 +14,12 @@ function job_scheduler_schema() {
   $schema['job_schedule'] = array(
     'description' => 'Schedule of jobs to be executed.',
     'fields' => array(
-      'callback' => array(
+      'name' => array(
         'type' => 'varchar',
         'length' => 128,
         'not null' => TRUE,
         'default' => '',
-        'description' => 'Callback to be invoked.',
+        'description' => 'Name of the schedule.',
       ),
       'type' => array(
         'type' => 'varchar',
@@ -73,8 +73,8 @@ function job_scheduler_schema() {
       ),
     ),
     'indexes' => array(
-      'callback_type_id' => array('callback', 'type', 'id'),
-      'callback_type' => array('callback', 'type'),
+      'name_type_id' => array('name', 'type', 'id'),
+      'name_type' => array('name', 'type'),
       'next' => array('next'),
       'scheduled' => array('scheduled'),
     ),
@@ -92,3 +92,21 @@ function job_scheduler_update_6101() {
   db_add_index($ret, 'job_schedule', 'next', array('next'));
   return $ret;
 }
+
+/**
+ * Rename 'callback' to 'name' field.
+ */
+function job_scheduler_update_7100() {
+  db_drop_index('job_schedule', 'callback_type_id');
+  db_drop_index('job_schedule', 'callback_type');
+  $spec = array(
+    'type' => 'varchar',
+    'length' => 128,
+    'not null' => TRUE,
+    'default' => '',
+    'description' => 'Name of the schedule.',
+  );
+  db_change_field('job_schedule', 'callback', 'name', $spec);
+  db_add_index($ret, 'job_schedule', 'name_type_id', array('name', 'type', 'id'));
+  db_add_index($ret, 'job_schedule', 'name_type', array('name', 'type'));
+}
Index: job_scheduler.module
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/job_scheduler.module,v
retrieving revision 1.4
diff -u -p -r1.4 job_scheduler.module
--- job_scheduler.module	16 Sep 2010 22:16:06 -0000	1.4
+++ job_scheduler.module	26 Sep 2010 00:28:50 -0000
@@ -10,12 +10,40 @@
  * Implementation of hook_cron().
  */
 function job_scheduler_cron() {
-  job_scheduler()->cron();
-}
+  // Reschedule stuck periodic jobs after one hour.
+  db_update('job_schedule')
+    ->fields(array(
+      'scheduled' => 0,
+    ))
+    ->condition('scheduled', REQUEST_TIME - 3600, '<')
+    ->condition('periodic', 1)
+    ->execute();
+
+  // Query and dispatch scheduled jobs.
+  $start = time();
+  $total =
+  $failed = 0;
+  $jobs = db_select('job_schedule', NULL, array('fetch' => PDO::FETCH_ASSOC))
+            ->fields('job_schedule')
+            ->condition('scheduled', 0)
+            ->condition('next', REQUEST_TIME, '<')
+            ->orderBy('next', 'ASC')
+            ->range(0, 200)
+            ->execute();
+  foreach ($jobs as $job) {
+    try {
+      JobScheduler::get($job['name'])->dispatch($job);
+    }
+    catch (Exception $e) {
+      watchdog('job_scheduler', $e->getMessage(), array(), WATCHDOG_ERROR);
+      $failed++;
+    }
+    $total++;
+    if (time() > ($start + 30)) {
+      break;
+    }
+  }
 
-/**
- * Return a JobScheduler object.
- */
-function job_scheduler() {
-  return JobScheduler::instance();
+  // Leave a note on how much time we spent processing.
+  watchdog('job_scheduler', 'Finished processing scheduled jobs (!time s, !total total, !failed failed).', array('!time' => format_interval(time() - $start), '!total' => $total, '!failed' => $failed));
 }
