diff --git a/job_scheduler.api.php b/job_scheduler.api.php index 8bd85b0..a24eee3 100644 --- a/job_scheduler.api.php +++ b/job_scheduler.api.php @@ -48,3 +48,29 @@ function hook_cron_job_scheduler_info_alter(&$info) { // Replace the default callback 'example_cache_clear_worker' $info['example_reset']['worker callback'] = 'my_custom_reset'; } + +/** + * Declare job scheduler queue information. + * + * TODO: Description. + * + * @see hook_cron_job_scheduler_queue_info_alter() + */ +function hook_cron_job_scheduler_queue_info() { + $info = array(); + $info['example_import_queue'] = array( + 'worker callback' => 'example_import_worker', + 'time' => 60, + ); + return $info; +} + +/** + * Alter job scheduler queue information. + * + * TODO: Description. + */ +function hook_cron_job_scheduler_queue_info_alter(&$info) { + // Replace the default callback + $info['example_import_queue']['worker callback'] = 'my_custom_import_worker'; +} diff --git a/job_scheduler.module b/job_scheduler.module index d6c3b3c..ba07451 100644 --- a/job_scheduler.module +++ b/job_scheduler.module @@ -31,6 +31,32 @@ function job_scheduler_info($name = NULL) { } } +/** + * Collects and returns scheduler queue info. + * + * @param string $name + * (optional) Name of the schedule. Defaults to null. + * + * @return array + * Information for the schedule queue if $name, all the information if not. + * + * @see hook_cron_job_scheduler_info() + */ +function job_scheduler_queue_info($name = NULL) { + $info = &drupal_static(__FUNCTION__); + if (!$info) { + $module_handler = \Drupal::moduleHandler(); + $info = $module_handler->invokeAll('cron_job_scheduler_queue_info'); + $module_handler->alter('cron_job_scheduler_queue_info', $info); + } + if ($name) { + return isset($info[$name]) ? $info[$name] : NULL; + } + else { + return $info; + } +} + /** * Implements hook_cron(). * @@ -84,7 +110,7 @@ function job_scheduler_cron() { $scheduler->remove($job); } $total++; - if ($timestamp > ($start + 30)) { + if (time() > ($start + 30)) { break; } } @@ -94,10 +120,26 @@ function job_scheduler_cron() { $date_formatter = \Drupal::service('date.formatter'); $logger = \Drupal::logger('job_scheduler'); $logger->info(t('Finished processing scheduled jobs (:time, :total total, :failed failed).', [ - ':time' => $date_formatter->formatInterval($timestamp - $start), + ':time' => $date_formatter->formatInterval(time() - $start), ':total' => $total, ':failed' => $failed, ])); + + // Grab the defined queues. + foreach (job_scheduler_queue_info() as $queue_name => $info) { + $worker_callback = $info['worker callback']; + $end = time() + (isset($info['time']) ? $info['time'] : 60); + $queue = \Drupal::queue($queue_name); + while (time() < $end && ($item = $queue->claimItem())) { + try { + call_user_func($worker_callback, $item->data); + $queue->deleteItem($item); + } + catch (Exception $e) { + watchdog_exception('job_scheduler', $e); + } + } + } } /** diff --git a/job_scheduler.services.yml b/job_scheduler.services.yml index ece52fb..c757183 100644 --- a/job_scheduler.services.yml +++ b/job_scheduler.services.yml @@ -1,4 +1,4 @@ services: job_scheduler.manager: class: Drupal\job_scheduler\JobScheduler - arguments: ['@database'] + arguments: ['@database', '@datetime.time'] diff --git a/src/JobScheduler.php b/src/JobScheduler.php index e834195..a0d8e92 100644 --- a/src/JobScheduler.php +++ b/src/JobScheduler.php @@ -2,6 +2,7 @@ namespace Drupal\job_scheduler; +use Drupal\Component\Datetime\TimeInterface; use Drupal\Core\Database\Connection; /** @@ -16,13 +17,22 @@ class JobScheduler implements JobSchedulerInterface { */ protected $database; + /** + * The time service. + * + * @var \Drupal\Component\Datetime\TimeInterface + */ + public $time; + /** * Constructs a object. * * @param \Drupal\Core\Database\Connection + * @param \Drupal\Component\Datetime\TimeInterface $time */ - public function __construct(Connection $database) { + public function __construct(Connection $database, TimeInterface $time) { $this->database = $database; + $this->time = $time; } /** @@ -69,7 +79,8 @@ class JobScheduler implements JobSchedulerInterface { * 'periodic' - True if the task should be repeated periodically. */ public function set(array $job) { - $timestamp = \Drupal::time()->getRequestTime(); + $timestamp = $this->time->getRequestTime(); + $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0; $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE; $job['last'] = $timestamp; if (!empty($job['crontab'])) { @@ -191,7 +202,8 @@ class JobScheduler implements JobSchedulerInterface { * @see \Drupal\job_scheduler\JobScheduler::set() */ public function reschedule(array $job) { - $timestamp = \Drupal::time()->getRequestTime(); + $timestamp = $this->time->getRequestTime(); + $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0; $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE; $job['last'] = $timestamp; $job['scheduled'] = 0; @@ -254,7 +266,8 @@ class JobScheduler implements JobSchedulerInterface { * @see \Drupal\job_scheduler\JobScheduler::set() */ protected function reserve(array $job) { - $timestamp = \Drupal::time()->getRequestTime(); + $timestamp = $this->time->getRequestTime(); + $job['periodic'] = isset($job['periodic']) ? (int) $job['periodic'] : 0; $job['data'] = isset($job['data']) ? serialize($job['data']) : FALSE; $job['scheduled'] = $job['period'] + $timestamp; $job['last'] = $timestamp; diff --git a/src/Plugin/QueueWorker/JobSchedulerQueue.php b/src/Plugin/QueueWorker/JobSchedulerQueue.php deleted file mode 100644 index fbfde1e..0000000 --- a/src/Plugin/QueueWorker/JobSchedulerQueue.php +++ /dev/null @@ -1,49 +0,0 @@ -scheduler = \Drupal::service('job_scheduler.manager'); - } - - /** - * {@inheritdoc} - */ - public function processItem($data) { - $scheduler = $this->scheduler; - try { - $scheduler->execute($data); - } - catch (\Exception $e) { - watchdog_exception('job_scheduler', $e); - // Drop jobs that have caused exceptions. - $scheduler->remove($data); - } - } -}