? 922702-7_queue_integration.patch ? 922702-8_queue_integration.patch Index: JobScheduler.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/JobScheduler.inc,v retrieving revision 1.9 diff -u -p -r1.9 JobScheduler.inc --- JobScheduler.inc 25 Sep 2010 18:37:45 -0000 1.9 +++ JobScheduler.inc 26 Sep 2010 00:34:46 -0000 @@ -7,41 +7,73 @@ */ /** - * Handle adding and removing jobs from schedule. + * Use to make Job Scheduler exceptions identifiable by type. + */ +class JobSchedulerException extends Exception {} + +/** + * Manage scheduled jobs. */ class JobScheduler { /** - * Create a single instance of JobScheduler. + * The name of this scheduler. + */ + protected $name; + + /** + * Produces a single instance of JobScheduler for a schedule name. */ - public static function instance() { - static $instance; - if (!isset($instance)) { - $class = variable_get('job_scheduler_class', 'JobScheduler'); - $instance = new $class(); + public static function get($name) { + static $schedulers; + // Instantiante a new scheduler for $name if we haven't done so yet. + if (!isset($schedulers[$name])) { + $class = variable_get('job_scheduler_class_'. $name, 'JobScheduler'); + $schedulers[$name] = new $class($name); } - return $instance; + return $schedulers[$name]; } /** - * Protect constructor. + * Creates a JobScheduler object. */ - protected function __construct() {} + protected function __construct($name) { + $this->name = $name; + } + + /** + * Returns scheduler info. + * + * @see hook_cron_job_scheduler_info(). + * + * @throws JobSchedulerException. + */ + public function info() { + // Collect info for all schedules once. + static $info; + if (!$info) { + $info = module_invoke_all('cron_job_scheduler_info'); + drupal_alter('cron_job_scheduler_info', $info); + } + if (isset($info[$this->name])) { + return $info[$this->name]; + } + throw new JobSchedulerException('Could not find Job Scheduler cron information for '. check_plain($this->name)); + } /** * Add a job to the schedule, replace any existing job. * - * A job is uniquely identified by $job = array(callback, type, id). + * A job is uniquely identified by $job = array(type, id). * * @param $job * An array that must contain the following keys: - * 'callback' - The callback to evoke. * 'type' - A string identifier of the type of job. * 'id' - A numeric identifier of the job. * 'period' - The time when the task should be executed. * 'periodic' - True if the task should be repeated periodically. * * @code - * function callback($job) { + * function worker_callback($job) { * // Work off job. * // Set next time to be called. If this portion of the code is not * // reached for some reason, the scheduler will keep periodically invoking @@ -51,6 +83,7 @@ class JobScheduler { * @endcode */ public function set($job) { + $job['name'] = $this->name; $job['last'] = REQUEST_TIME; $job['next'] = REQUEST_TIME + $job['period']; $job['scheduled'] = 0; @@ -59,92 +92,63 @@ class JobScheduler { } /** + * Reserve a job. + */ + protected function reserve($job) { + $job['name'] = $this->name; + $job['scheduled'] = + $job['last'] = REQUEST_TIME; + $job['next'] = $job['period'] + REQUEST_TIME; + drupal_write_record('job_schedule', $job, array('name', 'type', 'id')); + } + + /** * Remove a job from the schedule, replace any existing job. * - * A job is uniquely identified by $job = array(callback, type, id). + * A job is uniquely identified by $job = array(type, id). */ public function remove($job) { db_delete('job_schedule') - ->condition('callback', $job['callback']) + ->condition('name', $this->name) ->condition('type', $job['type']) ->condition('id', isset($job['id']) ? $job['id'] : 0) ->execute(); } /** - * Remove all jobs for a given callback and type. + * Remove all jobs for a given type. */ - public function removeAll($callback, $type) { + public function removeAll($type) { db_delete('job_schedule') - ->condition('callback', $callback) + ->condition('name', $this->name) ->condition('type', $type) ->execute(); } /** - * Periodic cron task. + * Dispatches a job. + * + * Executes a worker callback or if schedule declares a queue name, queues a + * job for execution. + * + * @param $job + * A $job array as passed into set() or read from job_schedule table. * - * Dispatches jobs until a limit of 200 jobs or 30 seconds processing time is - * reached. + * @throws Exception + * Exceptions thrown by code called by this method are passed on. */ - public function cron() {return; - // Check and set scheduler semaphore, take time. - if (variable_get('job_scheduler_cron', FALSE)) { - watchdog('JobScheduler', 'Last cron process did not finish.', array(), WATCHDOG_ERROR); - } - variable_set('job_scheduler_cron', TRUE); - $start = time(); - - // Reschedule stuck periodic jobs after one hour. - db_update('job_schedule') - ->fields(array( - 'scheduled' => 0, - )) - ->condition('scheduled', REQUEST_TIME - 3600, '<') - ->condition('periodic', 1) - ->execute(); - - // Query and dispatch scheduled jobs. - $start = time(); - $jobs = db_select('job_schedule') - ->fields('job_schedule', array('*')) - ->condition('scheduled', 0) - ->condition('next', REQUEST_TIME, '<') - ->orderBy('next', 'ASC') - ->range(0, 200) - ->execute(); - foreach ($jobs as $job) { - // Flag periodic jobs as scheduled, remove one-off jobs. - if ($job['periodic']) { - $job['scheduled'] = $job['last'] = REQUEST_TIME; - $job['next'] = $job['period'] + REQUEST_TIME; - drupal_write_record('job_schedule', $job, array('callback', 'type', 'id')); - } - else { - $this->remove($job); - } - // Queue job if there is a queue declared for it, otherwise execute it. - if (function_exists($job['callback'])) { - if (!$this->queue($job)) { - $job['callback']($job); - } - } - if (time() > ($start + 30)) { - break; + public function dispatch($job) { + $info = $this->info(); + if (!$job['periodic']) { + $this->remove($job); + } + if ($info['queue name']) { + if (DrupalQueue::get($info['queue name'])->createItem($job)) { + $this->reserve($job); } } - - // Unflag and post a message that we're done. - variable_set('job_scheduler_cron', FALSE); - watchdog('JobScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start))); - } - - /** - * Add a job to a queue. - * - * @todo: Implement. - */ - protected function queue($job) { - return FALSE; + if (function_exists($info['worker callback'])) { + $info['worker callback']($job); + } } } Index: README.txt =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/README.txt,v retrieving revision 1.5 diff -u -p -r1.5 README.txt --- README.txt 16 Sep 2010 22:14:36 -0000 1.5 +++ README.txt 26 Sep 2010 00:34:46 -0000 @@ -10,38 +10,82 @@ a fixed interval. Usage ===== +Declare scheduler. + + function example_cron_job_scheduler_info() { + $schedulers = array(); + $schedulers['example_unpublish'] = array( + 'worker callback' => 'example_unpublish_nodes', + ); + return $schedulers; + } + Add a job. $job = array( - 'callback' => 'mymodule_unpublish_nodes', 'type' => 'story', 'id' => 12, 'period' => 3600, 'periodic' => TRUE, ); - job_scheduler()->set($job); + JobScheduler::get('example_unpublish')->set($job); -Work off a job +Work off a job. - function mymodule_unpublish_nodes($job) { + function example_unpublish_nodes($job) { // Do stuff. } Remove a job. $job = array( - 'callback' => 'mymodule_unpublish_nodes', 'type' => 'story', 'id' => 12, ); - job_scheduler()->remove($job); + JobScheduler::get('example_unpublish')->remove($job); Drupal Queue integration ======================== -Not supported in Drupal 7 at the moment. +Optionally, at the scheduled time Job Scheduler can queue a job for execution, +rather than executing the job directly. This is useful when many jobs need to +be executed or when the job's expected execution time is very long. + +More information on Drupal Queue: http://api.drupal.org/api/group/queue/7 + +Instead of declaring a worker callback, declare a queue. + + function example_cron_job_scheduler_info() { + $schedulers = array(); + $schedulers['example_unpublish'] = array( + 'queue name' => 'example_unpublish_queue', + ); + return $schedulers; + } + +This of course assumes that you have declared a queue. Notice how in this +pattern the queue callback contains the actual worker callback. + function example_cron_queue_info() { + $schedulers = array(); + $schedulers['example_unpublish_queue'] = array( + 'worker callback' => 'example_unpublish_nodes', + ); + return $schedulers; + } + + +Work off a job: when using a queue, Job Scheduler reserves a job for one hour +giving the queue time to work off a job before it reschedules it. This means +that the worker callback needs to reset the job's schedule flag in order to +allow renewed scheduling. + + function example_unpublish_nodes($job) { + // Do stuff. + // Set the job again so that its reserved flag is reset. + JobScheduler::get('example_unpublish')->set($job); + } Example ======= @@ -55,6 +99,6 @@ Hidden settings Hidden settings are variables that you can define by adding them to the $conf array in your settings.php file. -Name: job_scheduler_class +Name: 'job_scheduler_class_' . $name Default: 'JobScheduler' -Description: The class to use for managing schedule. +Description: The class to use for managing a particular schedule. Index: job_scheduler.api.php =================================================================== RCS file: job_scheduler.api.php diff -N job_scheduler.api.php --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ job_scheduler.api.php 26 Sep 2010 00:34:46 -0000 @@ -0,0 +1,51 @@ + 'example_cache_clear_worker', + ); + $info['example_import'] = array( + 'queue name' => 'example_import_queue', + ); + return $info; +} + +/** + * Alter cron queue information before cron runs. + * + * Called by drupal_cron_run() to allow modules to alter cron queue settings + * before any jobs are processesed. + * + * @param array $info + * An array of cron schedule information. + * + * @see hook_cron_queue_info() + * @see drupal_cron_run() + */ +function hook_cron_job_scheduler_info_alter(&$info) { + // Replace the default callback 'example_cache_clear_worker' + $info['example_reset']['worker callback'] = 'my_custom_reset'; +} Index: job_scheduler.install =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/job_scheduler.install,v retrieving revision 1.5 diff -u -p -r1.5 job_scheduler.install --- job_scheduler.install 16 Sep 2010 20:27:15 -0000 1.5 +++ job_scheduler.install 26 Sep 2010 00:34:46 -0000 @@ -14,12 +14,12 @@ function job_scheduler_schema() { $schema['job_schedule'] = array( 'description' => 'Schedule of jobs to be executed.', 'fields' => array( - 'callback' => array( + 'name' => array( 'type' => 'varchar', 'length' => 128, 'not null' => TRUE, 'default' => '', - 'description' => 'Callback to be invoked.', + 'description' => 'Name of the schedule.', ), 'type' => array( 'type' => 'varchar', @@ -73,8 +73,8 @@ function job_scheduler_schema() { ), ), 'indexes' => array( - 'callback_type_id' => array('callback', 'type', 'id'), - 'callback_type' => array('callback', 'type'), + 'name_type_id' => array('name', 'type', 'id'), + 'name_type' => array('name', 'type'), 'next' => array('next'), 'scheduled' => array('scheduled'), ), @@ -92,3 +92,21 @@ function job_scheduler_update_6101() { db_add_index($ret, 'job_schedule', 'next', array('next')); return $ret; } + +/** + * Rename 'callback' to 'name' field. + */ +function job_scheduler_update_7100() { + db_drop_index('job_schedule', 'callback_type_id'); + db_drop_index('job_schedule', 'callback_type'); + $spec = array( + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Name of the schedule.', + ); + db_change_field('job_schedule', 'callback', 'name', $spec); + db_add_index('job_schedule', 'name_type_id', array('name', 'type', 'id')); + db_add_index('job_schedule', 'name_type', array('name', 'type')); +} Index: job_scheduler.module =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/job_scheduler/job_scheduler.module,v retrieving revision 1.4 diff -u -p -r1.4 job_scheduler.module --- job_scheduler.module 16 Sep 2010 22:16:06 -0000 1.4 +++ job_scheduler.module 26 Sep 2010 00:34:46 -0000 @@ -10,12 +10,40 @@ * Implementation of hook_cron(). */ function job_scheduler_cron() { - job_scheduler()->cron(); -} + // Reschedule stuck periodic jobs after one hour. + db_update('job_schedule') + ->fields(array( + 'scheduled' => 0, + )) + ->condition('scheduled', REQUEST_TIME - 3600, '<') + ->condition('periodic', 1) + ->execute(); + + // Query and dispatch scheduled jobs. + $start = time(); + $total = + $failed = 0; + $jobs = db_select('job_schedule', NULL, array('fetch' => PDO::FETCH_ASSOC)) + ->fields('job_schedule') + ->condition('scheduled', 0) + ->condition('next', REQUEST_TIME, '<') + ->orderBy('next', 'ASC') + ->range(0, 200) + ->execute(); + foreach ($jobs as $job) { + try { + JobScheduler::get($job['name'])->dispatch($job); + } + catch (Exception $e) { + watchdog('job_scheduler', $e->getMessage(), array(), WATCHDOG_ERROR); + $failed++; + } + $total++; + if (time() > ($start + 30)) { + break; + } + } -/** - * Return a JobScheduler object. - */ -function job_scheduler() { - return JobScheduler::instance(); + // Leave a note on how much time we spent processing. + watchdog('job_scheduler', 'Finished processing scheduled jobs (!time s, !total total, !failed failed).', array('!time' => format_interval(time() - $start), '!total' => $total, '!failed' => $failed)); }