diff --git a/advancedqueue.services.yml b/advancedqueue.services.yml index 2d6f553..2963f35 100644 --- a/advancedqueue.services.yml +++ b/advancedqueue.services.yml @@ -1,4 +1,14 @@ services: + advancedqueue.factory: + class: \Drupal\advancedqueue\Queue\AdvancedQueueFactory + arguments: ['@settings'] + calls: + - [setContainer, ['@service_container']] + advancedqueue.database: - class: Drupal\advancedqueue\Queue\AdvancedQueueDatabaseFactory - arguments: ['@database'] + class: \Drupal\advancedqueue\Queue\AdvancedQueueDatabaseFactory + arguments: ['@database', '@datetime.time'] + + plugin.manager.advancedqueue_worker: + class: \Drupal\advancedqueue\AdvancedqueueWorkerManager + parent: default_plugin_manager diff --git a/src/AdvancedQueueServiceProvider.php b/src/AdvancedQueueServiceProvider.php index 3f4c4d2..cb90cbf 100644 --- a/src/AdvancedQueueServiceProvider.php +++ b/src/AdvancedQueueServiceProvider.php @@ -15,8 +15,9 @@ class AdvancedQueueServiceProvider extends ServiceProviderBase { */ public function alter(ContainerBuilder $container) { // Overrides language_manager class to test domain language negotiation. - $queue_service_definition = $container->getDefinition('queue'); - $queue_service_definition->setClass('Drupal\advancedqueue\AdvancedQueueFactory'); + // @todo Decide whether we want to continue to do that. +// $queue_service_definition = $container->getDefinition('queue'); +// $queue_service_definition->setClass('Drupal\advancedqueue\AdvancedQueueFactory'); } } diff --git a/src/AdvancedQueueWorkerBase.php b/src/AdvancedQueueWorkerBase.php new file mode 100644 index 0000000..ecda2d9 --- /dev/null +++ b/src/AdvancedQueueWorkerBase.php @@ -0,0 +1,17 @@ +alterInfo('advancedqueue_worker'); + $this->setCacheBackend($cache_backend, 'advancedqueue_worker'); + + parent::__construct('Plugin/advancedqueue/Worker', $namespaces, $module_handler, + 'Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface', + '\Drupal\advancedqueue\Annotation\AdvancedQueueWorker' + ); + } + +} diff --git a/src/Annotation/AdvancedQueueWorker.php b/src/Annotation/AdvancedQueueWorker.php new file mode 100644 index 0000000..ad42a4b --- /dev/null +++ b/src/Annotation/AdvancedQueueWorker.php @@ -0,0 +1,16 @@ +schema_columns); + /** + * {@inheritdoc} + */ + public function __construct($name, Connection $connection, TimeInterface $time) { + $this->name = $name; + $this->connection = $connection; + $this->time = $time; + } - $fields += [ - 'item_key' => NULL, + protected function jobToFields(QueueJob $job) { + return [ 'queue_name' => $this->name, - 'uid' => 0, // @TODO: User - 'created' => time(), - 'status' => 0, // @TODO: Statuses...? - 'data' => serialize($data), + 'data' => serialize($job->getPayload()), + 'expire' => $job->getExpire(), + 'created' => $job->getStartTime() ?: $this->time->getCurrentTime(), + 'item_key' => $job->getItemKey(), + 'uid' => $job->getUid(), + 'label' => $job->getLabel(), + 'processed_time' => $job->getProcessedTime() ?: 0, + 'status' => $job->getStatus(), ]; + } + + protected function doCreateItem(QueueJob $job) { + $fields = $this->jobToFields($job); - if (!empty($meta['item_key'])) { + if (!empty($fields['item_key'])) { // We allow replacing existing items based on their key. That means we // need a bunch of logic in this spot. $key = $fields['item_key']; @@ -77,25 +103,44 @@ class AdvancedQueueDatabase extends DatabaseQueue { /** * {@inheritdoc} */ - public function createItem($data) { - $this->createAdvancedItem($data, []); + public function updateJobItem(QueueJob $job) { + if (empty($job->getItemId())) { + // @todo We could insert the job automatically instead. + throw new \Exception('You cannot update a job item without a job ID yet.'); + } + + $fields = $this->jobToFields($job); + + try { + // This function allows the item's creation date and data package to + // be modified and requeued for later. + $update = $this->connection->update(static::TABLE_NAME) + ->fields($fields) + ->condition('item_id', $job->getItemId()); + return $update->execute(); + } + catch (\Exception $e) { + $this->catchException($e); + // If the table doesn't exist we should consider the item released. + return TRUE; + } } /** * {@inheritdoc} */ - public function releaseItem($item) { + public function releaseItem(QueueJob $item) { try { // This function allows the item's creation date and data package to // be modified and requeued for later. $update = $this->connection->update(static::TABLE_NAME) ->fields([ 'expire' => NULL, - 'status' => static::STATUS_QUEUED, - 'created' => $item->created, - 'data' => serialize($item->data), + 'status' => QueueJobStatus::STATUS_QUEUED, + 'created' => $item->getStartTime(), + 'data' => serialize($item->getPayload()), ]) - ->condition('item_id', $item->item_id); + ->condition('item_id', $item->getItemId()); return $update->execute(); } catch (\Exception $e) { @@ -108,16 +153,16 @@ class AdvancedQueueDatabase extends DatabaseQueue { /** * {@inheritdoc} */ - public function deleteItem($item) { + public function deleteItem(QueueJob $item) { try { $update = $this->connection->update(static::TABLE_NAME) ->fields([ 'expire' => NULL, - 'status' => !empty($item->status) ? $item->status : static::STATUS_SUCCESS, - 'result' => serialize(isset($item->result) ? $item->result : array()), - 'processed' => time(), + 'status' => $item->getStatus(), + 'result' => serialize($item->getResult()), + 'processed_time' => $this->time->getCurrentTime(), ]) - ->condition('item_id', $item->item_id); + ->condition('item_id', $item->getItemKey()); return $update->execute(); } catch (\Exception $e) { @@ -139,10 +184,13 @@ class AdvancedQueueDatabase extends DatabaseQueue { try { $params = [ ':name' => $this->name, - ':status' => static::STATUS_QUEUED, - ':created' => time(), + ':status' => QueueJobStatus::STATUS_QUEUED, + ':created' => $this->time->getCurrentTime(), ]; - $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE status = :status AND expire IS NULL AND created <= :created AND queue_name = :name ORDER BY created, item_id ASC', 0, 1, $params)->fetchObject(); + $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE status = :status AND expire IS NULL AND created <= :created AND queue_name = :name ORDER BY created, item_id ASC', 0, 1, $params)->fetchAssoc(); + if ($item) { + $item['payload'] = unserialize($item['data']); + } } catch (\Exception $e) { $this->catchException($e); @@ -150,7 +198,7 @@ class AdvancedQueueDatabase extends DatabaseQueue { // claim. return FALSE; } - if ($item) { + if ($item && $job = new QueueJob($item)) { // Try to update the item. Only one thread can succeed in UPDATEing the // same row. We cannot rely on REQUEST_TIME because items might be // claimed by a single consumer which runs longer than 1 second. If we @@ -159,14 +207,14 @@ class AdvancedQueueDatabase extends DatabaseQueue { // should really expire. $update = $this->connection->update(static::TABLE_NAME) ->fields([ - 'expire' => time() + $lease_time, + 'expire' => $this->time->getCurrentTime() + $lease_time, + 'status' => QueueJobStatus::STATUS_PROCESSING, ]) - ->condition('item_id', $item->item_id) + ->condition('item_id', $job->getItemId()) ->isNull('expire'); // If there are affected rows, this update succeeded. if ($update->execute()) { - $item->data = unserialize($item->data); - return $item; + return $job; } } else { @@ -183,7 +231,7 @@ class AdvancedQueueDatabase extends DatabaseQueue { // @TODO: Isn't there a less-weird way to structure this function?? $try_again = FALSE; try { - $id = $this->doCreateItem($data, $meta); + $id = $this->doCreateItem($job); } catch (\Exception $e) { // If there was an exception, try to create the table. @@ -195,7 +243,7 @@ class AdvancedQueueDatabase extends DatabaseQueue { } // Now that the table has been created, try again if necessary. if ($try_again) { - $id = $this->doCreateItem($data); + $id = $this->doCreateItem($job); } return $id; @@ -225,7 +273,7 @@ class AdvancedQueueDatabase extends DatabaseQueue { */ public function numberOfItems() { try { - return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE queue_name = :name AND status = ' . static::STATUS_QUEUED, [':name' => $this->name]) + return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE queue_name = :name AND status = :status', [':name' => $this->name, ':status' => QueueJobStatus::STATUS_QUEUED]) ->fetchField(); } catch (\Exception $e) { @@ -239,61 +287,90 @@ class AdvancedQueueDatabase extends DatabaseQueue { * Defines the schema for the advancedqueue table. */ public function schemaDefinition() { - $schema = parent::schemaDefinition(); - - // Slightly more clear naming for the queue name column. - $schema['fields']['queue_name'] = $schema['fields']['name']; - unset($schema['fields']['name']); - - // Use a null for the default of expire. - $schema['fields']['expire']['not null'] = FALSE; - unset($schema['fields']['expire']['default']); - - $schema['fields'] += [ - 'item_key' => [ - 'type' => 'varchar_ascii', - 'length' => 255, - 'not null' => FALSE, - 'description' => 'The unique key of the queue item, if any.', - ], - 'uid' => [ - 'type' => 'int', - 'type' => 'int', - 'unsigned' => TRUE, - 'not null' => TRUE, - 'description' => 'The user to which the item belongs.', + return [ + 'description' => 'Stores items in queues.', + 'fields' => [ + 'item_id' => [ + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ], + 'queue_name' => [ + 'type' => 'varchar_ascii', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ], + 'data' => [ + 'type' => 'blob', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary data for the item.', + ], + 'expire' => [ + 'type' => 'int', + // @todo Is this change really needed? + 'not null' => FALSE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ], + 'created' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ], + 'item_key' => [ + 'type' => 'varchar_ascii', + 'length' => 255, + 'not null' => FALSE, + 'description' => 'The unique key of the queue item, if any.', + ], + 'uid' => [ + 'type' => 'int', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'The user to which the item belongs.', + ], + 'label' => [ + 'type' => 'varchar', + 'length' => 400, + 'not null' => TRUE, + 'description' => 'The title of this item.', + ], + 'processed_time' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was processed.', + ], + 'status' => [ + 'type' => 'int', + 'not null' => TRUE, + 'default' => -1, + 'size' => 'tiny', + 'description' => 'Indicates whether the item has been processed (-1 = queue, 0 = processing, 1 = successfully processed, 2 = failed).', + ], + 'result' => [ + 'type' => 'blob', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary result for the item, only significant if {advancedqueue}.status <> 0', + ], ], - 'label' => [ - 'type' => 'varchar', - 'length' => 400, - 'not null' => TRUE, - 'description' => 'The title of this item.', + 'primary key' => ['item_id'], + 'indexes' => [ + 'name_created' => ['queue_name', 'created'], + 'expire' => ['expire'], ], - 'processed' => [ - 'type' => 'int', - 'not null' => TRUE, - 'default' => 0, - 'description' => 'Timestamp when the item was processed.', - ], - 'status' => [ - 'type' => 'int', - 'not null' => TRUE, - 'default' => -1, - 'size' => 'tiny', - 'description' => 'Indicates whether the item has been processed (-1 = queue, 0 = processing, 1 = successfully processed, 2 = failed).', - ], - 'result' => [ - 'type' => 'blob', - 'not null' => FALSE, - 'size' => 'big', - 'serialize' => TRUE, - 'description' => 'The arbitrary result for the item, only significant if {advancedqueue}.status <> 0', + 'unique keys' => [ + 'item_key' => ['item_key'], ], ]; - - $schema['unique keys']['item_key'] = ['item_key']; - - return $schema; } /** @@ -309,5 +386,69 @@ class AdvancedQueueDatabase extends DatabaseQueue { $this->catchException($e); } } + + public function createJobItem(QueueJob $job) { + return $this->createAdvancedItem($job); + } + + /** + * {@inheritdoc} + */ + public function getItemByItemKey($item_key) { + $item = $this->connection->queryRange('SELECT * FROM {' . static::TABLE_NAME . '} q WHERE queue_name = :queue_name AND item_key = :item_key', 0, 1, ['queue_name' => $this->name, 'item_key' => $item_key])->fetchAssoc(); + if ($item) { + return new QueueJob($item); + } + } + + + /** + * {@inheritdoc} + */ + public function createQueue() { + // All tasks are stored in a single database table (which is created on + // demand) so there is nothing we need to do to create a new queue. + } + + /** + * + * Check if the table exists and create it if not. + */ + protected function ensureTableExists() { + try { + $database_schema = $this->connection->schema(); + if (!$database_schema->tableExists(static::TABLE_NAME)) { + $schema_definition = $this->schemaDefinition(); + $database_schema->createTable(static::TABLE_NAME, $schema_definition); + return TRUE; + } + } + // If another process has already created the queue table, attempting to + // recreate it will throw an exception. In this case just catch the + // exception and do nothing. + catch (SchemaObjectExistsException $e) { + return TRUE; + } + return FALSE; + } + + /** + * Act on an exception when queue might be stale. + * + * If the table does not yet exist, that's fine, but if the table exists and + * yet the query failed, then the queue is stale and the exception needs to + * propagate. + * + * @param $e + * The exception. + * + * @throws \Exception + * If the table exists the exception passed in is rethrown. + */ + protected function catchException(\Exception $e) { + if ($this->connection->schema()->tableExists(static::TABLE_NAME)) { + throw $e; + } + } } diff --git a/src/Queue/AdvancedQueueDatabaseFactory.php b/src/Queue/AdvancedQueueDatabaseFactory.php index 71a5f81..b52cf0c 100644 --- a/src/Queue/AdvancedQueueDatabaseFactory.php +++ b/src/Queue/AdvancedQueueDatabaseFactory.php @@ -2,6 +2,8 @@ namespace Drupal\advancedqueue\Queue; +use Drupal\Component\Datetime\TimeInterface; +use Drupal\Core\Database\Connection; use Drupal\Core\Queue\QueueDatabaseFactory; /** @@ -10,16 +12,29 @@ use Drupal\Core\Queue\QueueDatabaseFactory; class AdvancedQueueDatabaseFactory extends QueueDatabaseFactory { /** + * The time service. + * + * @var \Drupal\Component\Datetime\TimeInterface + */ + protected $time; + + public function __construct(Connection $connection, TimeInterface $time) { + parent::__construct($connection); + + $this->time = $time; + } + + /** * Constructs a new queue object for a given name. * * @param string $name * The name of the requested queue. * - * @return \Drupal\Core\Queue\DatabaseQueue + * @return \Drupal\advancedqueue\Queue\AdvancedQueueInterface * A queue class for the specified queue. */ public function get($name) { - return new AdvancedQueueDatabase($name, $this->connection); + return new AdvancedQueueDatabase($name, $this->connection, $this->time); } } diff --git a/src/Queue/AdvancedQueueFactory.php b/src/Queue/AdvancedQueueFactory.php index 5dc3d00..7098e97 100644 --- a/src/Queue/AdvancedQueueFactory.php +++ b/src/Queue/AdvancedQueueFactory.php @@ -13,18 +13,18 @@ use Drupal\Core\Queue\QueueFactory; class AdvancedQueueFactory extends QueueFactory { /** - * {@inheritdoc} + * @return \Drupal\advancedqueue\Queue\AdvancedQueueInterface */ public function get($name, $reliable = FALSE) { if (!isset($this->queues[$name])) { // If it is a reliable queue, check the specific settings first. if ($reliable) { - $service_name = $this->settings->get('queue_reliable_service_' . $name); + $service_name = $this->settings->get('advancedqueue_reliable_service_' . $name); } // If no reliable queue was defined, check the service and global // settings, fall back to queue.database. if (empty($service_name)) { - $service_name = $this->settings->get('queue_service_' . $name, $this->settings->get('queue_default', 'advancedqueue.database')); + $service_name = $this->settings->get('advancedqueue_service_' . $name, $this->settings->get('queue_default', 'advancedqueue.database')); } $this->queues[$name] = $this->container->get($service_name)->get($name); } diff --git a/src/Queue/AdvancedQueueInterface.php b/src/Queue/AdvancedQueueInterface.php index 5ceb24e..01f0cf0 100644 --- a/src/Queue/AdvancedQueueInterface.php +++ b/src/Queue/AdvancedQueueInterface.php @@ -3,13 +3,12 @@ namespace Drupal\advancedqueue\Queue; use Drupal\advancedqueue\QueueJob; -use Drupal\Core\Queue\ReliableQueueInterface; /** * An AdvancedQueue interface as a guide to later possible non-database * implementations. */ -interface AdvancedQueueInterface extends ReliableQueueInterface { +interface AdvancedQueueInterface { /** * Create an item with metadata for the item itself, allowing the caller to @@ -29,9 +28,28 @@ interface AdvancedQueueInterface extends ReliableQueueInterface { */ public function createJobItem(QueueJob $job); + public function updateJobItem(QueueJob $job); + /** - * @return \Drupal\advancedqueue\QueueJob + * @return \Drupal\advancedqueue\QueueJob|NULL */ public function getItemByItemKey($item_key); + public function numberOfItems(); + + /** + * @param int $lease_time + * + * @return \Drupal\advancedqueue\QueueJob + */ + public function claimItem($lease_time = 3600); + + public function deleteItem(QueueJob $job); + + public function releaseItem(QueueJob $job); + + public function createQueue(); + + public function deleteQueue(); + } diff --git a/src/Queue/AdvancedQueueWorkerInterface.php b/src/Queue/AdvancedQueueWorkerInterface.php index e7410de..533a535 100644 --- a/src/Queue/AdvancedQueueWorkerInterface.php +++ b/src/Queue/AdvancedQueueWorkerInterface.php @@ -2,11 +2,20 @@ namespace Drupal\advancedqueue\Queue; +use Drupal\advancedqueue\QueueJob; + +/** + * @todo Should the advanced stuff like getGroups by on its own custom interface? + */ interface AdvancedQueueWorkerInterface { /** + * @param \Drupal\advancedqueue\QueueJob $job + * * @return \Drupal\advancedqueue\QueueJobStatus */ - public function processItem(); + public function processItem(QueueJob $job); + + public function getGroups(); } diff --git a/src/QueueJob.php b/src/QueueJob.php index 36e95f8..af997a5 100644 --- a/src/QueueJob.php +++ b/src/QueueJob.php @@ -4,6 +4,8 @@ namespace Drupal\advancedqueue; class QueueJob { + protected $itemId; + protected $itemKey; protected $uid; @@ -14,6 +16,14 @@ class QueueJob { protected $payload; + protected $expire; + + protected $label; + + protected $processedTime; + + protected $result; + /** * QueueJob constructor. * @@ -23,20 +33,73 @@ class QueueJob { * @param $startTime * @param $status */ - public function __construct($payload, $itemKey = NULL, $uid = NULL, $startTime = NULL, $status = NULL) { - $this->itemKey = $itemKey; - $this->uid = $uid; - $this->startTime = $startTime; - $this->status = $status; - $this->payload = $payload; + public function __construct(array $definition) { + $definition += [ + 'item_id' => NULL, + 'item_key' => NULL, + 'uid' => 0, + 'created' => NULL, + 'expire' => NULL, + 'payload' => NULL, + 'label' => '', + 'processed_time' => NULL, + 'result' => NULL, + 'status' => QueueJobStatus::STATUS_QUEUED, + ]; + + $this->itemId = $definition['item_id']; + $this->itemKey = $definition['item_key']; + $this->uid = $definition['uid']; + $this->startTime = $definition['created']; + $this->status = $definition['status']; + $this->payload = $definition['payload']; + $this->expire = $definition['expire']; + $this->label = $definition['label']; + $this->processedTime = $definition['processed_time']; + $this->result = $definition['result']; } public static function create($payload, $itemKey = NULL) { - return new static($payload, $itemKey); + return new static([ + 'payload' => $payload, + 'item_key' => $itemKey, + ]); } public static function createForTime($payload, $created, $itemKey = NULL) { - return new static($payload, $itemKey, NULL, $created); + return new static([ + 'payload' => $payload, + 'item_key' => $itemKey, + 'created' => $created, + ]); + } + + /** + * @return mixed + */ + public function getExpire() { + return $this->expire; + } + + /** + * @return mixed + */ + public function getProcessedTime() { + return $this->processedTime; + } + + /** + * @return mixed + */ + public function getLabel() { + return $this->label; + } + + /** + * @return mixed + */ + public function getItemId() { + return $this->itemId; } /** @@ -89,6 +152,24 @@ class QueueJob { return $this->payload; } - + /** + * @return mixed + */ + public function getResult() { + return $this->result; + } + + /** + * Updates the result. + * + * @param mixed $result + * + * @return static + */ + public function setResult($result) { + $job = clone $this; + $job->result = $result; + return $job; + } } diff --git a/src/QueueJobStatus.php b/src/QueueJobStatus.php index 194e2d6..ae82c4a 100644 --- a/src/QueueJobStatus.php +++ b/src/QueueJobStatus.php @@ -13,4 +13,47 @@ class QueueJobStatus { const STATUS_FAILED = 2; const STATUS_RETRY = 3; + protected $status; + + /** + * @var mixed + */ + protected $result; + + /** + * QueueJobStatus constructor. + */ + public function __construct($status, $result = NULL) { + $this->status = $status; + $this->result = $result; + } + + public function is($status) { + return $this->status === $status; + } + + public function getResult() { + return $this->result; + } + + public static function queued() { + return new static(static::STATUS_QUEUED); + } + + public static function processing() { + return new static(static::STATUS_PROCESSING); + } + + public static function success($result = NULL) { + return new static(static::STATUS_SUCCESS, $result); + } + + public static function failed() { + return new static(static::STATUS_FAILED); + } + + public static function retry() { + return new static(static::STATUS_RETRY); + } + } diff --git a/src/Runner/AdvancedQueueRunnerBase.php b/src/Runner/AdvancedQueueRunnerBase.php index 9ced9eb..ac83812 100644 --- a/src/Runner/AdvancedQueueRunnerBase.php +++ b/src/Runner/AdvancedQueueRunnerBase.php @@ -2,8 +2,14 @@ namespace Drupal\advancedqueue\Runner; -use Drupal\Core\Queue\QueueFactory; -use Drupal\Core\Queue\QueueWorkerManagerInterface; +use Drupal\advancedqueue\Queue\AdvancedQueueFactory; +use Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface; +use Drupal\advancedqueue\QueueJob; +use Drupal\advancedqueue\QueueJobStatus; +use Drupal\Component\Datetime\TimeInterface; +use Drupal\Component\Plugin\PluginManagerInterface; +use Drupal\Core\Queue\RequeueException; +use Drupal\Core\Queue\SuspendQueueException; /** * @file Placeholder for eventual Drush command. Porting just the original @@ -15,7 +21,8 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface { /** * The queue factory used to get the actual queue to claim items from. - * @var \Drupal\Core\Queue\QueueFactory + * + * @var \Drupal\advancedqueue\Queue\AdvancedQueueFactory */ protected $queueFactory; @@ -38,16 +45,22 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface { protected $queueWorkerGroups = []; /** + * @var \Drupal\Component\Datetime\TimeInterface + */ + protected $time; + + /** * Constructs the queue runner. * - * @param \Drupal\Core\Queue\QueueFactory $queueFactory + * @param \Drupal\advancedqueue\Queue\AdvancedQueueFactory $queueFactory * The queue service. - * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $queueWorkerManager + * @param \Drupal\Component\Plugin\PluginManagerInterface $queueWorkerManager * The queue plugin manager. */ - public function __construct(QueueFactory $queueFactory, QueueWorkerManagerInterface $queueWorkerManager) { + public function __construct(AdvancedQueueFactory $queueFactory, PluginManagerInterface $queueWorkerManager, TimeInterface $time) { $this->queueFactory = $queueFactory; $this->queueWorkerManager = $queueWorkerManager; + $this->time = $time; $definitions = $this->queueWorkerManager->getDefinitions(); // Go through our definitions to do some setup. @@ -73,12 +86,15 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface { * * @param string[] $groupNames * The names of the groups being requested. + * + * @throws \Exception + * Thrown when an invalid group name was passed in. */ public function setGroups(array $groupNames) { $invalidGroups = array_diff($groupNames, $this->queueWorkerGroups); if (!empty($invalidGroups)) { $invalidGroupNames = implode(', ', $invalidGroups); - throw new Exception('The following groups are invalid: ' . $invalidGroupNames); + throw new \Exception('The following groups are invalid: ' . $invalidGroupNames); } else { // Set us up to process queue workers. @@ -100,12 +116,15 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface { * * @param string[] $queueNames * The names of the queues being requested. + * + * @throws \Exception + * Thrown when an invalid queue name was passed in. */ public function setQueues(array $queueNames) { $invalidQueues = array_diff($queueNames, array_keys($this->queueWorkers)); if (!empty($invalidQueues)) { $invalidQueueNames = implode(', ', $invalidQueues); - throw new Exception('The following queues are invalid: ' . $invalidQueueNames); + throw new \Exception('The following queues are invalid: ' . $invalidQueueNames); } else { foreach ($this->queueWorkers as $name => $worker) { @@ -124,56 +143,22 @@ abstract class AdvancedQueueRunnerBase implements AdvancedQueueRunnerInterface { * @TODO: Figure out the best way to allow going infinite. */ protected function processQueues($timeLimit) { + $runner_helper = new RunnerHelper(); // Grab the defined cron queues. - $end = time() + $timeLimit; - while (time() < $end) { + $end = $this->time->getCurrentTime() + $timeLimit; + while ($this->time->getCurrentTime() < $end) { foreach ($this->queueWorkers as $name => $worker) { - $leaseTime = $worker->getLeaseTime(); + // @todo Implement that + // $leaseTime = $worker->getLeaseTime(); + $leaseTime = 30; $queue = $this->queueFactory->get($name); - if ($item = $queue->claimItem($leaseTime)) { - // This is the part which e.g. a FastCGI runner will reimplement. - // The default process function simply phones the worker, but - // FCGI will need to do more magical things. - try { - $result = $this->process($worker, $item); - // @TODO: Assuming a result, handle it here. + try { + while ($job = $runner_helper->processNextItem($queue, $worker, $leaseTime)) { } - catch (\Exception $e) { - // @TODO: Exception logic goes here. See commented code below. - // @TODO: Also, we probably need to generate correct $result sets - // for known exceptions and handle the result later? TBD. - - // try { - // $queue_worker->processItem($item->data); - // $queue->deleteItem($item); - // } - // catch (RequeueException $e) { - // // The worker requested the task be immediately requeued. - // $queue->releaseItem($item); - // } - // catch (SuspendQueueException $e) { - // // If the worker indicates there is a problem with the whole queue, - // // release the item and skip to the next queue. - // $queue->releaseItem($item); - - // watchdog_exception('cron', $e); - - // // Skip to the next queue. - // continue 2; - // } - // catch (\Exception $e) { - // // In case of any other kind of exception, log it and leave the item - // // in the queue to be processed again later. - // watchdog_exception('cron', $e); - // } - } - - // If we make it all the way to here, it means nothing really bad was - // thrown and we can go back to the beginning. - continue 2; } - // If the conditional above fails, then no item was claimed and we can - // allow the foreach to take us to the next queue. + catch (SuspendQueueException $e) { + continue; + } } } } diff --git a/src/Runner/AdvancedQueueRunnerInterface.php b/src/Runner/AdvancedQueueRunnerInterface.php new file mode 100644 index 0000000..f18a1f4 --- /dev/null +++ b/src/Runner/AdvancedQueueRunnerInterface.php @@ -0,0 +1,9 @@ +processItem($job); + } + + public function processNextItem(AdvancedQueueInterface $queue, AdvancedQueueWorkerInterface $worker, $leaseTime) { + if ($job = $queue->claimItem($leaseTime)) { + // This is the part which e.g. a FastCGI runner will reimplement. + // The default process function simply phones the worker, but + // FCGI will need to do more magical things. + try { + $result = $this->process($worker, $job); + if ($result === NULL) { + // If there is no result we assume life is fine. + $queue->deleteItem($job); + } + elseif ($result->is(QueueJobStatus::STATUS_QUEUED)) { + throw new \Exception('Workers are not allowed to set the status back to queued'); + } + elseif ($result->is(QueueJobStatus::STATUS_PROCESSING)) { + throw new \Exception('Workers should set the status to failed, retry or success');; + } + elseif ($result->is(QueueJobStatus::STATUS_RETRY)) { + $queue->releaseItem($job); + } + elseif ($result->is(QueueJobStatus::STATUS_FAILED)) { + $job->setStatus(QueueJobStatus::STATUS_FAILED); + $queue->updateJobItem($job); + } + elseif ($result->is(QueueJobStatus::STATUS_SUCCESS)) { + $job = $job->setStatus(QueueJobStatus::STATUS_SUCCESS); + $job = $job->setResult($result->getResult()); + $queue->updateJobItem($job); + } + return $result; + } + catch (RequeueException $e) { + // The worker requested the task be immediately requeued. + $queue->releaseItem($job); + return QueueJobStatus::retry(); + } + catch (SuspendQueueException $e) { + // If the worker indicates there is a problem with the whole queue, + // release the item and skip to the next queue. + $queue->releaseItem($job); + + watchdog_exception('cron', $e); + + // Skip to the next queue. + throw $e; + } + catch (\Exception $e) { + watchdog_exception('cron', $e); + // Skip to the next queue. + throw new SuspendQueueException($e->getMessage(), $e->getCode(), $e); + } + } + } + +} diff --git a/src/Runner/SerialQueueRunner.php b/src/Runner/SerialQueueRunner.php new file mode 100644 index 0000000..e8120e9 --- /dev/null +++ b/src/Runner/SerialQueueRunner.php @@ -0,0 +1,11 @@ +processQueues(0); + } + +} diff --git a/tests/modules/advancedqueue_test/advancedqueue_test.info.yml b/tests/modules/advancedqueue_test/advancedqueue_test.info.yml new file mode 100644 index 0000000..983affe --- /dev/null +++ b/tests/modules/advancedqueue_test/advancedqueue_test.info.yml @@ -0,0 +1,5 @@ +name: advancedqueue tet +core: 8.x +type: module +dependencies: + - advancedqueue diff --git a/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php new file mode 100644 index 0000000..d9e42bb --- /dev/null +++ b/tests/modules/advancedqueue_test/src/Plugin/advancedqueue/Worker/Worker1.php @@ -0,0 +1,21 @@ +getPayload()['status']; + $new_result = $job->getPayload()['result']; + return new QueueJobStatus($new_status, $new_result); + } + + /** + * {@inheritdoc} + */ + public function getGroups() { + return ['group']; + } + +} diff --git a/tests/src/Kernel/AdvancedQueueKernelTest.php b/tests/src/Kernel/AdvancedQueueKernelTest.php index 7d1b2b2..0a59c28 100644 --- a/tests/src/Kernel/AdvancedQueueKernelTest.php +++ b/tests/src/Kernel/AdvancedQueueKernelTest.php @@ -15,11 +15,11 @@ class AdvancedQueueKernelTest extends KernelTestBase { /** * {@inheritdoc} */ - public static $modules = ['advancequeue']; + public static $modules = ['advancedqueue']; public function testNormalQueue() { - $advanced_queue_1 = new AdvancedQueueDatabase('queue1', \Drupal::database()); - $advanced_queue_2 = new AdvancedQueueDatabase('queue1', \Drupal::database()); + $advanced_queue_1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), \Drupal::time()); + $advanced_queue_2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), \Drupal::time()); $this->runQueueTest($advanced_queue_1, $advanced_queue_2); } @@ -47,10 +47,10 @@ class AdvancedQueueKernelTest extends KernelTestBase { $new_items = []; $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); // First two dequeued items should match the first two items we queued. $this->assertEquals(2, $this->queueScore($data, $new_items), 'Two items matched'); @@ -63,10 +63,10 @@ class AdvancedQueueKernelTest extends KernelTestBase { $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty while Queue 1 has items'); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); // All dequeued items should match the items we queued exactly once, // therefore the score must be exactly 4. @@ -109,34 +109,34 @@ class AdvancedQueueKernelTest extends KernelTestBase { } // Queue items 1 and 2 in the queue1. - $queue1->createAdvancedItem(QueueJob::create($data[0])); - $queue1->createAdvancedItem(QueueJob::create($data[1])); + $queue1->createJobItem(QueueJob::create($data[0])); + $queue1->createJobItem(QueueJob::create($data[1])); // Retrieve two items from queue1. $items = []; $new_items = []; $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); // First two dequeued items should match the first two items we queued. $this->assertEquals(2, $this->queueScore($data, $new_items), 'Two items matched'); // Add two more items. - $queue1->createAdvancedItem(QueueJob::create($data[2])); - $queue1->createAdvancedItem(QueueJob::create($data[3])); + $queue1->createJobItem(QueueJob::create($data[2])); + $queue1->createJobItem(QueueJob::create($data[3])); - $this->assertTrue($queue1->numberOfItems(), 'Queue 1 is not empty after adding items.'); + $this->assertNotEmpty($queue1->numberOfItems(), 'Queue 1 is not empty after adding items.'); $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty while Queue 1 has items'); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); $items[] = $item = $queue1->claimItem(); - $new_items[] = $item->data; + $new_items[] = $item->getPayload(); // All dequeued items should match the items we queued exactly once, // therefore the score must be exactly 4. @@ -155,9 +155,16 @@ class AdvancedQueueKernelTest extends KernelTestBase { $this->assertFalse($queue2->numberOfItems(), 'Queue 2 is empty'); } + /** + * @todo is this test really that useful? + */ public function testAdvancedQueueSimpleProcessing() { - $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database()); - $queue2 = new AdvancedQueueDatabase('queue1', \Drupal::database()); + $this->time = new TestTime(); + $this->time->setCurrentTime(4); + $this->time->setRequestTime(0); + + $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), $this->time); + $queue2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), $this->time); $this->runQueueTestWithAdvancedQueueCreate($queue1, $queue2); } @@ -173,28 +180,32 @@ class AdvancedQueueKernelTest extends KernelTestBase { $time->setCurrentTime(4); $time->setRequestTime(0); - \Drupal::getContainer()->set('datetime.time', $time); + $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database(), $time); + $queue2 = new AdvancedQueueDatabase('queue2', \Drupal::database(), $time); - $queue1 = new AdvancedQueueDatabase('queue1', \Drupal::database()); - $queue2 = new AdvancedQueueDatabase('queue1', \Drupal::database()); - - $queue1->createAdvancedItem(QueueJob::createForTime($data[0], 10)); $queue1->createAdvancedItem(QueueJob::createForTime($data[0], 5)); + $queue1->createAdvancedItem(QueueJob::createForTime($data[1], 10)); $this->assertFalse($queue1->claimItem()); $this->assertFalse($queue2->claimItem()); $time->setCurrentTime(7); + $this->fetchAllItems(); $item = $queue1->claimItem(); - $this->assertEquals($data[0], $item->data); + $this->fetchAllItems(); + $this->assertEquals($data[0], $item->getPayload()); $item = $queue1->claimItem(); $this->assertFalse($item); $time->setCurrentTime(11); $item = $queue1->claimItem(); - $this->assertEquals($data[1], $item->data); + $this->assertEquals($data[1], $item->getPayload()); $item = $queue1->claimItem(); $this->assertFalse($item); } + protected function fetchAllItems() { + print_r(\Drupal::database()->query("SELECT * from {advancedqueue}")->fetchAll()); + } + } diff --git a/tests/src/Kernel/AdvancedQueueRunnerTest.php b/tests/src/Kernel/AdvancedQueueRunnerTest.php index 153aa2b..74a3292 100644 --- a/tests/src/Kernel/AdvancedQueueRunnerTest.php +++ b/tests/src/Kernel/AdvancedQueueRunnerTest.php @@ -2,18 +2,79 @@ namespace Drupal\Tests\advancedqueue\Kernel; +use Drupal\advancedqueue\Queue\AdvancedQueueDatabase; use Drupal\advancedqueue\Queue\AdvancedQueueWorkerInterface; use Drupal\advancedqueue\QueueJob; use Drupal\advancedqueue\QueueJobStatus; +use Drupal\advancedqueue\Runner\RunnerHelper; +use Drupal\advancedqueue\Runner\SerialQueueRunner; use Drupal\KernelTests\KernelTestBase; +/** + * Tests the advanced queue runner. + * + * @group advancedqueue + */ class AdvancedQueueRunnerTest extends KernelTestBase { + /** + * {@inheritdoc} + */ + public static $modules = ['advancedqueue', 'advancedqueue_test']; + + public function testEmptyQueue() { + $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time()); + + $runner->run(); + } + + public function testSetGroupsWithInvalidGroup() { + $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time()); + + $this->setExpectedException(\Exception::class, 'The following groups are invalid: invalid_group'); + $runner->setGroups(['invalid_group']); + } + + public function testSetQueuesWithInvalidQueue() { + $runner = new SerialQueueRunner(\Drupal::service('advancedqueue.factory'), \Drupal::service('plugin.manager.advancedqueue_worker'), \Drupal::time()); + + $this->setExpectedException(\Exception::class, 'The following queues are invalid: invalid_queue'); + $runner->setQueues(['invalid_queue']); + } + + public function testRunnerHelper() { + $runner_helper = new RunnerHelper(); + + /** @var \Drupal\advancedqueue\Queue\AdvancedQueueFactory $queue_factory */ + $queue_factory = \Drupal::service('advancedqueue.factory'); + /** @var \Drupal\advancedqueue\AdvancedqueueWorkerManager $worker_manager */ + $worker_manager = \Drupal::service('plugin.manager.advancedqueue_worker'); + + $queue = $queue_factory->get('worker_status'); + $queue->createJobItem(QueueJob::create(['status' => QueueJobStatus::STATUS_SUCCESS, 'result' => 'my result'])); + $this->assertEquals(1, $queue->numberOfItems()); + + $result = $runner_helper->processNextItem($queue, $worker_manager->createInstance('worker_status'), 30); + + $this->assertEquals(0, $queue->numberOfItems()); + $this->assertTrue($result->is(QueueJobStatus::STATUS_SUCCESS)); + $this->assertEquals('my result', $result->getResult()); + + $queue->createJobItem(QueueJob::create(['status' => QueueJobStatus::STATUS_RETRY, 'result' => 'please retry'])); + $this->assertEquals(1, $queue->numberOfItems()); + + $result = $runner_helper->processNextItem($queue, $worker_manager->createInstance('worker_status'), 30); + + $this->assertEquals(1, $queue->numberOfItems()); + $this->assertTrue($result->is(QueueJobStatus::STATUS_RETRY)); + $this->assertEquals('please retry', $result->getResult()); + } + public function testQueueRunner() { $runner = ''; /** @var \Drupal\advancedqueue\Queue\AdvancedQueueInterface $queue */ - $queue = ''; + $queue = new AdvancedQueueDatabase('queue1', \Drupal::database(), \Drupal::time()); $data = []; for ($i = 0; $i < 5; $i++) { $data[] = [$this->randomMachineName() => $this->randomMachineName()]; diff --git a/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php b/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php new file mode 100644 index 0000000..93d68d8 --- /dev/null +++ b/tests/src/Kernel/AdvancedQueueWorkerManagerTest.php @@ -0,0 +1,27 @@ +getDefinitions(); + $this->assertArrayHasKey('worker1', $definitions); + } + +}