diff --git a/hosting.drush.inc b/hosting.drush.inc index 12f13d1..15fb7e5 100644 --- a/hosting.drush.inc +++ b/hosting.drush.inc @@ -58,6 +58,10 @@ function hosting_drush_command() { 'force' => array( 'description' => 'Force the specified task to execute even if it is not queued to run.', ), + 'post-task-delay' => array( + 'description' => 'Specify a number of second to pause after task execution before marking the task as complete.', + 'hidden' => TRUE, + ), ), 'bootstrap' => DRUSH_BOOTSTRAP_DRUPAL_FULL, ); diff --git a/queued/hosting_queued.admin.inc b/queued/hosting_queued.admin.inc index edb255f..3a87c26 100644 --- a/queued/hosting_queued.admin.inc +++ b/queued/hosting_queued.admin.inc @@ -22,6 +22,14 @@ function hosting_queued_settings_form($form, &$form_state) { '#markup' => !empty($last_seen) ? t('Last started: @interval ago.', array('@interval' => format_interval(REQUEST_TIME - $last_seen))) : t('Never started.'), ); + $form['hosting_queued_task_concurrency'] = array( + '#type' => 'select', + '#title' => t('Task concurrency'), + '#description' => t('Select how many tasks can run in parallel via the queue daemon.'), + '#default_value' => variable_get('hosting_queued_task_concurrency', 1), + '#options' => drupal_map_assoc(range(1, 50)), + ); + $form['hosting_queued_post_task_delay'] = array( '#type' => 'select', '#title' => t('Post task delay'), diff --git a/queued/hosting_queued.drush.inc b/queued/hosting_queued.drush.inc index 983c45f..aa8c3b5 100644 --- a/queued/hosting_queued.drush.inc +++ b/queued/hosting_queued.drush.inc @@ -63,6 +63,10 @@ function drush_hosting_queued() { watchdog('hosting_queued', 'Started Hosting queue daemon, waiting for new tasks'); + // Delay for a configurable amount of time. + $post_task_delay = variable_get('hosting_queued_post_task_delay', 0); + $task_concurrency = variable_get('hosting_queued_task_concurrency', 1); + while (TRUE) { try { // Should we terminate. @@ -71,47 +75,32 @@ function drush_hosting_queued() { hosting_queued_restart(); } - - // Get some tasks to run - if ($tasks = @_hosting_get_new_tasks()) { - if (lock_acquire('hosting_queue_tasks_running', HOSTING_QUEUE_LOCK_TIMEOUT)) { - drush_log('Acquired lock on task queue.'); - foreach ($tasks as $task) { - // We sleep for a second just in case others want to run the task first. - // This guards against other processes that want to add a hosting task - // with arguments and run it immediately, they should be able to do this - // without us getting in there first. - // This is a workaround for http://drupal.org/node/1003536 - drush_log(dt('Found task to execute. Pausing before execution.')); - sleep(1); - - watchdog('hosting_queued', 'Running task @nid.', array('@nid' => $task->nid)); - // Execute the task in the backend - drush_invoke_process('@self', 'hosting-task', array($task->nid), array('strict' => FALSE), array('interactive' => TRUE)); - drush_log(dt('Finished executing task.')); - - // Delay for a configurable amount of time. - $delay = variable_get('hosting_queued_post_task_delay', 0); - if (!empty($delay)) { - drush_log(dt('Going to sleep for @count seconds after completing task.', array('@count' => $delay))); - sleep($delay); - } - - // We're done with this task, this unset might help reduce memory usage. - unset($task); - - // Should we terminate. - if (REQUEST_TIME > $end_time) { - // Restart the daemon to recycle leaked memory - hosting_queued_restart(); + // Compute the amount of tasks to run, but don't allow negative values. + $tasks_to_run = max(0, $task_concurrency - hosting_task_count_running()); + + // Get some tasks to run if we should. + if ($tasks_to_run > 0 && ($tasks = @_hosting_get_new_tasks($tasks_to_run))) { + if (lock_acquire('hosting_queue_tasks_running', HOSTING_QUEUE_LOCK_TIMEOUT)) { + drush_log('Acquired lock on task queue.'); + foreach ($tasks as $task) { + watchdog('hosting_queued', 'Forking to execute task @nid.', array('@nid' => $task->nid)); + // Execute the task in a separate process. + drush_invoke_process('@self', 'hosting-task', array($task->nid), array('strict' => FALSE, 'post-task-delay' => $post_task_delay), array('fork' => TRUE)); + + // We're done with this task, this unset might help reduce memory usage. + unset($task); + + // Should we terminate. + if (time() > $end_time) { + // Restart the daemon to recycle leaked memory + hosting_queued_restart(); + } } + drush_log('Releasing lock on task queue.'); + lock_release('hosting_queue_tasks_running'); } - drush_log('Releasing lock on task queue.'); - lock_release('hosting_queue_tasks_running'); } } - - } catch (Exception $e) { // Check if there was a database error, so we can recover gracefully if needed. // See: https://drupal.org/node/1992254. diff --git a/task.hosting.inc b/task.hosting.inc index 65e7cc5..32f85b0 100644 --- a/task.hosting.inc +++ b/task.hosting.inc @@ -155,6 +155,11 @@ function drush_hosting_task() { // New revision is created at the beginning of function. $task->revision = FALSE; $task->delta = microtime(TRUE) - $task->executed; + // Optionally sleep before updating the task node. + $delay = drush_get_option('post-task-delay', 0); + if (is_numeric($delay) && $delay > 0) { + sleep($delay); + } node_save($task); }