Problem/Motivation

  • The queue system in core was basically ported in a really minimal way
  • Core has no proper queue runner (It is just executed as part of core cron)
  • You cannot retry a queue item properly on failure
  • Fetching status of existing queue items is not possible
  • Queueing items for some future is not possible

Proposed resolution

New architecture:

  • Add a QueueJob value object which is used throughout the API which contains all the data you need, like the time when the item should be executed
  • Queue workers are able to return one of N stati (SUCCESS, FAILED, RETRY)
  • Queue items can have one of the following N stati (QUEUED, PROCESSING, SUCCESS, FAILED, RETRY)
  • Queue workers are plugins, which are controlled by plugin.manager.advancedqueue_worker
  • Queue workers return a QueueJobResult object, which allows the queue runners to put them back into the queue, delete them etc. These objects also store some result payload, which could be useful for various reasons
  • Queue implementations are really similar to the core implementation beside using the QueueJob object. There are also additional methods on the interface, like one to update a queue job, which is needed for processing the result
  • Additionally we have queue runners (@todo Do we need those queue groups?)
  • We have a helper object which claims one item, executes the worker and processes the result, but beside of that doesn't contain any further logic. All queue runners should be able to reuse that object/function. This could be a trait/subclass as well. \Drupal\advancedqueue\Runner\RunnerHelper
  • There is an example drush and console runner, but still really rough
  • @todo Write a fastcgi queue runner (https://tideways.io/profiler/blog/using-php-fpm-as-a-simple-built-in-asy... might be interesting in that context)

For now this patch mostly add some fake test coverage which might help to lead the future implementation.

Remaining tasks

There are MANY todos:

  • Design the queue/queue worker/queue runner API
  • Get the tests running with the implementation
  • Cleanup the mess of the current patch

User interface changes

API changes

Data model changes

Support from Acquia helps fund testing for Drupal Acquia logo

Comments

dawehner created an issue. See original summary.

dawehner’s picture

Status: Active » Needs review
FileSize
17.28 KB

Status: Needs review » Needs work

The last submitted patch, 2: 2913344-2.patch, failed testing. View results

bojanz’s picture

+  public function createAdvancedItem(QueueJob $job) {

All of our methods refer to queue items, but the value object is QueueJob. We need to standardize on one of them (Item, probably?)

+  public function __construct($payload, $itemKey = NULL, $uid = NULL, $startTime = NULL, $status = NULL) {

These should be snake_case at least. With 5 params it might make sense to use the Commerce pattern of accepting array $definition, example here: https://github.com/drupalcommerce/commerce/blob/8.x-2.x/modules/tax/src/...

+class QueueJobStatus {
+
+  /**
+   * Status codes.
+   */
+  const STATUS_QUEUED = -1;
+  const STATUS_PROCESSING = 0;
+  const STATUS_SUCCESS = 1;
+  const STATUS_FAILED = 2;
+  const STATUS_RETRY = 3;

We usually put constants on the object that uses them, so why not have these on the QueueJob itself?

dawehner’s picture

All of our methods refer to queue items, but the value object is QueueJob. We need to standardize on one of them (Item, probably?)

I stepped forward and used the QueueJob object everywhere, see issue summary.

These should be snake_case at least. With 5 params it might make sense to use the Commerce pattern of accepting array $definition, example here: https://github.com/drupalcommerce/commerce/blob/8.x-2.x/modules/tax/src/...

Nice suggestion! This is indeed much better and we can still hide that stuff by using static factories, if needed.

We usually put constants on the object that uses them, so why not have these on the QueueJob itself?

I went along and made the result an object for itself, see issue summary :)

dawehner’s picture

Issue summary: View changes
dawehner’s picture

FileSize
65.61 KB

This includes some cleanup after some discussion with @bojanz

  • The runners no longer have knowledge about executing queues and groups. Maybe there is a good usecase, but at least for me, it seems to be something which can be added later
  • There is now a working runner for console and drush included
EclipseGc’s picture

Version: 7.x-1.x-dev » 8.x-1.x-dev

assigning proper version.

bojanz’s picture

I spent some time on this on saturday, to see if I could implement the D7 spirit of the module, where the existing core API is enhanced, instead of building a parallel API like Daniel started in #7. That failed, for several reasons:
1) The queue backend can be overridden only via settings.php, the only way around that is to swap the entire factory.
2) There is no good way to avoid core's queue processing on cron, without another service swap.
Swapping out several core services brings additional confusion to an already confusing API, so I've decided to abandon that idea.

That brings us back to #7 as the road to take.

Looking at other queue implementations, I found Laravel's to be the most interesting one: https://laravel.com/docs/5.5/queues
Core's queue was built carefully to only have features supported by multiple backends.
Advancedqueue added several SQL specific ideas (future created date, a status, loading a queue item by ID).
Laravel's implementation provides clues for how such features can be done in a backend independent manner.
For example, using $delay for tasks that should be processed in the future.

It also feels like core never solved the problem of queues having to store their credentials somewhere. It's all left as an implementation detail for the factory. It feels like it would make sense to use the regular config entity + plugin pattern for Queue.
That could then contain other settings as well, such as "process me on cron" VS "process me via the daemon", whether items should be kept after processing, etc.

Modules would then do:
$queue = Queue::load('commerce_recurring');
$queue->getPlugin()->claimItem();
The SQL schema could stop storing the queue name in the advancedqueue table (cause it's irrelevant which queue an item belongs to), but start storing the worker's plugin ID, or a group that we tie workers to via annotation. There's terminology to be bikesheded (Laravel makes the distinction between connection and queue. Beanstalk does queue and tube. Etc)

colan’s picture

I'm probably too late to the party (or maybe this was discussed already), but in the spirit of getting off the island, has anyone thought about bringing in something like Celery instead of doing this ourselves?

We're planning on using it for AegirNG.

dawehner’s picture

@colan
Thank you for sharing this queue runner. I haven't heart of it before to be honest.

To be clear, we want to allow the storage to still be swappable. Modules should be able to put things into the queue in some generic way.

Advanced queue though has some features, most queuing system don't support, like time based tasks. @celery seems to support that though: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight... which is nice. Given that there could be another implementation based upon celery.

bojanz’s picture

Status: Needs work » Needs review
FileSize
105.29 KB

Here's a new patch against HEAD.
It's pretty large, so you might find it easier to look at https://github.com/bojanz/advancedqueue which contains it.

Here's what we have:
1) The queue config entity. Can be locked (non-deletable). Holds configuration for a Backend plugin (Database, Redis, etc). Holds runner settings (Cron/Daemon, timeouts at some point, etc).

Note that this is not meant to hold the Backend credentials, that still goes in settings.php (otherwise we'd have to duplicate it per queue, which is a collection of jobs in this context).

2) A fully expressed Backend API with a Database implementation. Based on my research of Redis, SQS, Beanstalk, Gearman and others.
Features:
- Configurable lease time
- Bulk job creation
- Delayed processing (run the job in 10 days, retry in 1 day, etc)
- Retries
- onSuccess() / onFailure(), allowing the backend to respond appropriately (transfer job to a success/failure queue, delete it, etc)
- Optional interfaces for deleting/releasing jobs by ID, meant for a UI.
Fully covered by tests.

3) Job and JobResult objects. Goodbye badly defined "items". Everyone uses the job terminology.

4) Worker plugins, responsible for processing jobs of a specific type.

Usage:

$job = Job::create('myworker', ['my' => 'data']);

$queue = Queue::load('default');
$queue->getBackend()->createJob($job);

Notice how the queue config entity gives us control over which jobs go where. We can have one queue processed by cron and another processed by drush, one in SQL and another in Redis, one high-priority and one low priority. A queue can have jobs belonging to multiple workers.

Here's what's missing:
1) The runner is incomplete, I wanted to explore setting the logger via closures, and handling the Drush/Console difference that way.
There are no tests so I am assuming it's broken.
2) The retry logic is also incomplete, the Job needs to track the number of retries, and the runner needs to look at it.
3) Any kind of listing UI (open question on Views VS homegrown via $backend->listJobs()).

Questions (that I want you to answer):
1) Is it odd that a Queue stores configuration for a Backend, plus this: "A backend is instantiated by the parent queue (config entity) and given a queue ID."?

I wanted to differentiate between "queue as a backend" and "queue as a collection of jobs", which is a distinction Drupal core never made.
I can see reason in saying "The queue config entity is an instance of the queue plugin" and renaming Backend to Queue though. Other APIs commonly use that terminology. Let's choose our tradeoff.

2) Many APIs use "queue worker" as a term for runners, not "classes that process a given job". Still, I used the Drupal core terminology for now, with the following comment:

 * Queue workers contain logic for processing a given job.
 * For example, sending an email or deleting an expired entity.
 * A job is always handled single queue worker.

Do we think that JobType would perhaps be better terminology?

3) Do we like $job->getPayload(), as a renaming of $job->getData()? Daniel did that in his patch and I followed along, don't know if the word payload is scarier, seems to make sense.

4) max_retries or maximum_retries? (getMaxRetries() vs getMaximumRetries())?

5) I copied the state names from D7, so we have Queued/Processing/Success/Failed. Would "Success" make more sense as "Succeeded"? Along with JobResult::succeeded() instead of JobResult::success()? Wasn't sure.

borisson_’s picture

I like all the work you did here, it looks really good!

Answers to the questions:

1. Search api uses the backend name as thing that stores data as well (http://cgit.drupalcode.org/search_api/tree/src/Backend/BackendInterface.php), so that terminology makes sense from my standpoint, since it share the name. (We also have a databaseBackend, a solrBackend, ...)
2. I think the jobHandler would be the best name for a class that handles/processes a job. Queue worker already has another meaning as you pointed out and jobType doesn't make it clear for me that this thing will handle a job.
3. I prefer getPayload as well. No real reason though.
4. I frequently misspell maximum, so maxRetries sounds good, no strong preference either way.
5. Succeeded sounds better, as it's more in line with the other state names.

About the usage example you gave:

$job = Job::create('myworker', ['my' => 'data']);

$queue = Queue::load('default');
$queue->getBackend()->createJob($job);

I don't really like the ->createJob($job) call, because it doesn't really create the job, that was already created by Job::create. I think I'd prefer ->attachJob.

fgm’s picture

One problem I've met when doing Queue API drivers for D8, since I actually did the ports for Beanstalkd, RabbitMQ, and Apache Kafka, is related to the long-running processes involved with the queue workers.

In production situations, it is not uncommon for infra to change outside Drupal between Drupal deployments (e.g. network reconfiguration). This can causes the main DB connection to go down while the queue server (or alternate DB for AdvancedQueue) connection can still be up, leading to hard to catch/recover issues.

So one need common to all these drivers would be an ability NOT to keep the main DB connection open between jobs, or at least recover from a DB connection loss.

One issue I've also had with all these drivers is the lack of a clearly defined job class, which would enable type hinting and support extra features from the underlying driver. For instance, in the Beanstalkd driver, I added a Job class, and in the currently reviewed RabbitMQ refactor ( https://github.com/FGM/rabbitmq/pull/17 ), I added a DecoderAwareWorkerInterface to enable the worker plugins to support multiple encoding formats to enable better interoperability with jobs submitted from non-Drupal, or even non-PHP code.

Another solution I've noticed but not implemented but which makes sense, is to run the long-running process outside Drupal itself, as in https://github.com/ricbra/rabbitmq-cli-consumer : a Golang server waits on the (RabbitMQ queue), and triggers the PHP CLI operation on messages, replicating the short-running experience for which PHP is actually optimized, and avoiding the long-running DB connection.

dawehner’s picture

Questions:
1)

I think we have two usecases

a) A module author wants to ship with queues preconfigured. This usecase is solved by shipping DB cron configurations.
b) Someone wants to improve the processing speed (They can edit the existing configuration entities)

Both of them are fullfilled with the current proposed architecture.
What is not fullfilled:

c) I want to use a high performance queue runner and want it to take over all existing and future queues.

I think its fine to have additional user configuration required for this usecase, given that most queues won't be long enough to require external queue runners

2)

Just a random though, maybe job worker might be okay? JobType, at least for me, seems to do nothing.

3) For me "data" tells me nothing. Payload is something which communicates: This is the actual important bits of data. Similar to payload on network packets.

4) For me at least there is no possibility of misreading getMaxRetries is totally fine. Also: Grepping core reveals 10x more "max" than "maximums".

5) I personally would have picked: Queued, Processing, Success, Failure. I like "Success" and "Failure" because it represents the final state, rather than an action represented by a verb.
Note: This might be inspired by elm ;) http://blog.jenkster.com/2016/06/how-elm-slays-a-ui-antipattern.html

Here are a couple of random points ...

  1. +++ b/src/Entity/Queue.php
    @@ -0,0 +1,235 @@
    +  protected $pluginCollection;
    

    Nitpick: It feels like it would be nice to indicate what is inside this plugin collection using the name like: backendPluginCollection

  2. +++ b/src/Plugin/AdvancedQueue/Backend/Database.php
    @@ -0,0 +1,238 @@
    +    if (count($jobs) > 1) {
    +      // Make the inserts atomic, and improve performance on certain engines.
    +      $transaction = $this->connection->startTransaction();
    +    }
    

    Nice!!

  3. +++ b/src/Plugin/AdvancedQueue/Backend/Database.php
    @@ -0,0 +1,238 @@
    +  public function retryJob(Job $job, $delay = 0) {
    +    $job->setAvailableTime($this->time->getCurrentTime() + $delay);
    +    $job->setState(Job::STATE_QUEUED);
    

    Should you be able to retry successful jobs?

  4. +++ b/src/Plugin/AdvancedQueue/Backend/SupportsDeletingJobsInterface.php
    @@ -0,0 +1,18 @@
    +interface SupportsDeletingJobsInterface {
    
    +++ b/src/Plugin/AdvancedQueue/Backend/SupportsListingJobsInterface.php
    @@ -0,0 +1,13 @@
    +interface SupportsListingJobsInterface {
    
    +++ b/src/Plugin/AdvancedQueue/Backend/SupportsReleasingJobsInterface.php
    @@ -0,0 +1,18 @@
    +interface SupportsReleasingJobsInterface {
    +
    

    I really like this feature architecture

  5. +++ b/src/Runner.php
    @@ -0,0 +1,133 @@
    +          $queue_backend->retryJob($job, $worker->getRetryDelay());
    

    Just conceptually, I'm wondering whether we could allow workers to make this decision based upon the job itself? Maybe you could use some payload information to determine when to next retry something?

bojanz’s picture

Yay, smart reviews!

Looks like we all agree on max_attempts and payload. Closing those questions.

I think the jobHandler would be the best name for a class that handles/processes a job. Queue worker already has another meaning as you pointed out and jobType doesn't make it clear for me that this thing will handle a job.

Just a random though, maybe job worker might be okay? JobType, at least for me, seems to do nothing.

My thinking behind JobType was that it would communicate that there's only one class/plugin that handles a single job.
Judging by your reactions, it doesn't do the trick, so we'll need to keep bikeshedding.
Not sure QueueWorker -> JobWorker would accomplish anything, it's still a "Worker". JobHandler might make sense, though I'm not sure it communicates the "1 class per job" thing.

I don't really like the ->createJob($job) call, because it doesn't really create the job, that was already created by Job::create. I think I'd prefer ->attachJob.

This is a good point. We are repeating the used verb. We could always do ->enqueueJob(Job $job) or pushJob(), though that opens the question of what to do with ->claimJob(), keep it as is or rename it to match (dequeueJob() or popJob()).

In production situations, it is not uncommon for infra to change outside Drupal between Drupal deployments (e.g. network reconfiguration). This can causes the main DB connection to go down while the queue server (or alternate DB for AdvancedQueue) connection can still be up, leading to hard to catch/recover issues.

So one need common to all these drivers would be an ability NOT to keep the main DB connection open between jobs, or at least recover from a DB connection loss.

I am assuming that people processing queues with Drush/Console are daemonizing the runner via Supervisor or a similar alternative.
In that case doesn't it make sense to simply exit when the DB connection stops, and rely on Supervisor to restart the runner?
Same with the long-running-process problem, where we can configure the Runner to exit every 5-10 min?

One issue I've also had with all these drivers is the lack of a clearly defined job class, which would enable type hinting and support extra features from the underlying driver. For instance, in the Beanstalkd driver, I added a Job class, and in the currently reviewed RabbitMQ refactor ( https://github.com/FGM/rabbitmq/pull/17 ), I added a DecoderAwareWorkerInterface to enable the worker plugins to support multiple encoding formats to enable better interoperability with jobs submitted from non-Drupal, or even non-PHP code.

Indeed. Would be happy to get your notes on the Job class in the current patch. The intention is to have it backend agnostic, and put the rest in $job->getPayload().

Another solution I've noticed but not implemented but which makes sense, is to run the long-running process outside Drupal itself, as in https://github.com/ricbra/rabbitmq-cli-consumer : a Golang server waits on the (RabbitMQ queue), and triggers the PHP CLI operation on messages, replicating the short-running experience for which PHP is actually optimized, and avoiding the long-running DB connection.

No reason not to add a drush/console command that supports that.

What is not fullfilled:
c) I want to use a high performance queue runner and want it to take over all existing and future queues.
I think its fine to have additional user configuration required for this usecase, given that most queues won't be long enough to require external queue runners

The assumption here is that we'd want to silently change the backend of existing queues. That is potentially dangerous, since there is no guarantee that the queues will be empty. Plus, all runners would need to be restarted.

For me it makes more sense to create a new queue with the new backend, and then reconfigure the modules to tell them to use the new queue. Each module can have a setting for the queue they're using.

5) I personally would have picked: Queued, Processing, Success, Failure. I like "Success" and "Failure" because it represents the final state, rather than an action represented by a verb.

Great idea, let's do that.

Should you be able to retry successful jobs?

No. I'll add a check + exception.

I really like this feature architecture

Same approach I took for Commerce payment gateways :)

Just conceptually, I'm wondering whether we could allow workers to make this decision based upon the job itself? Maybe you could use some payload information to determine when to next retry something?

Right now we take $max_retries and $retry_delay from the worker annotation.
We could allow the worker to override that per-job via JobResult::failure($message = '', $max_retries = NULL, $retry_delay = NULL).
How does that sound?

dawehner’s picture

Not sure QueueWorker -> JobWorker would accomplish anything, it's still a "Worker". JobHandler might make sense, though I'm not sure it communicates the "1 class per job" thing.

What about going with JobTypeWorker, which includes both bits we care about?

Right now we take $max_retries and $retry_delay from the worker annotation.
We could allow the worker to override that per-job via JobResult::failure($message = '', $max_retries = NULL, $retry_delay = NULL).

+1 I like this idea

bojanz’s picture

FileSize
105.51 KB

Here's an updated patch with the following commits:

https://github.com/bojanz/advancedqueue/commit/8e031f23d20b73a48ff7e977d...
https://github.com/bojanz/advancedqueue/commit/48a1c550622e279efddbe5d24...
https://github.com/bojanz/advancedqueue/commit/9495033fc0554f5d3c4d4bb9c...

TL;DR:
- Rename createJob() to enqueueJob(), STATE_FAILED to STATE_FAILURE.
- Add a processing time setting to queues.
- Clarify that only failed jobs can be retried.
- Add BackendInterface::cleanupQueue() as a replacement for the core garbageCollection() method.
- Rename Runner to Processor, expand the implementation and tests.
- phpcs fixes

Plan is to do a few more, then do the initial commit.

dawehner’s picture

+1 for merging it at that stage.

+++ b/src/Command/QueueRunCommand.php
@@ -0,0 +1,43 @@
+    $console_runner = \Drupal::service('advancedqueue.runner');

🔧 This is processor now :)

I have one question: Is there a reason why the indexes are so different to the ones defined in \Drupal\Core\Queue\DatabaseQueue::schemaDefinition?

EclipseGc’s picture

  1. +++ b/advancedqueue.drush.inc
    @@ -0,0 +1,29 @@
    +    // 'aliases' => ['mmas'],
    

    If we're not going to use an alias we should probably just remove it.

  2. +++ b/advancedqueue.install
    @@ -1,4 +1,80 @@
    +      'payload' => [
    +        'type' => 'blob',
    +        'not null' => FALSE,
    +        'size' => 'big',
    +        'serialize' => TRUE,
    +        'description' => 'The job payload.',
    +      ],
    

    I believe there's a known security issue with serializing blobs. If we ever want to expose this across a restful endpoint, this bit could be problematic. Might be better to manually json_encode/decode the data stored in the blob rather than letting drupal do the normal php serialization.

  3. +++ b/advancedqueue.module
    @@ -2,20 +2,22 @@
    +  $queues = $queue_storage->loadMultiple();
    +  foreach ($queues as $queue) {
    +    if ($queue->getRunner() == QueueInterface::RUNNER_CRON) {
    

    Would it be possible to do something like:

    $queue_storage->loadByProperties(['runner' => QueueInterface::RUNNER_CRON]);

    ??

  4. +++ b/advancedqueue.permissions.yml
    @@ -0,0 +1,3 @@
    diff --git a/advancedqueue.plugin_type.yml b/advancedqueue.plugin_type.yml
    

    wat is this?

  5. +++ b/config/schema/advancedqueue.schema.yml
    @@ -0,0 +1,34 @@
    diff --git a/console.services.yml b/console.services.yml
    

    is this a drupal console thing?

  6. +++ b/src/BackendManager.php
    @@ -0,0 +1,58 @@
    +  public function processDefinition(&$definition, $plugin_id) {
    +    parent::processDefinition($definition, $plugin_id);
    +
    +    foreach (['id', 'label'] as $required_property) {
    +      if (empty($definition[$required_property])) {
    +        throw new PluginException(sprintf('The backend "%s" must define the "%s" property.', $plugin_id, $required_property));
    +      }
    +    }
    +  }
    

    So, this is fine, but little known/used fact. D8 supports object returns from annotation classes (Layout Discovery in core uses this) which means you could avoid these defaults and processing them by overriding your get() method on the annotation to return an object instead. Then you'd get a legit failure from trying to instantiate that object with blank values if you like. :-D

    Feel free to ignore the suggestion too.

  7. +++ b/src/Form/QueueForm.php
    @@ -0,0 +1,150 @@
    +        Queue::PROCESSOR_CRON => $this->t('Cron'),
    +        Queue::PROCESSOR_DAEMON => $this->t('Daemon (Drush / Drupal Console)'),
    

    This looks like an error, shouldn't it be QueueInterface?

  8. +++ b/src/Job.php
    @@ -0,0 +1,390 @@
    +  public function __construct(array $definition) {
    +    foreach (['worker_id', 'payload', 'state'] as $required_property) {
    +      if (empty($definition[$required_property])) {
    +        throw new \InvalidArgumentException(sprintf('Missing property "%s"', $required_property));
    +      }
    +    }
    +    $this->assertState($definition['state']);
    +
    +    $this->id = !empty($definition['id']) ? $definition['id'] : '';
    +    $this->queueId = !empty($definition['queue_id']) ? $definition['queue_id'] : '';
    +    $this->workerId = $definition['worker_id'];
    +    $this->payload = $definition['payload'];
    +    $this->state = $definition['state'];
    +    $this->message = !empty($definition['message']) ? $definition['message'] : NULL;
    +    $this->available = !empty($definition['available']) ? (int) $definition['available'] : 0;
    +    $this->processed = !empty($definition['processed']) ? (int) $definition['processed'] : 0;
    +    $this->expires = !empty($definition['expires']) ? (int) $definition['expires'] : 0;
    +  }
    

    I'd prefer to see these all passed independently to this data object. Personal preference, do what you like, but given the nature of queuing, when and how it happens, a little reflection would go a long way and not radically change how you do this, but it'd be cleaner and you could set defaults in the function signature. (and type hints if you adopt a php7 dependency)

  9. +++ b/src/Job.php
    @@ -0,0 +1,390 @@
    +  /**
    +   * Sets the queue ID.
    +   *
    +   * @param string $queue_id
    +   *   The queue ID.
    +   *
    +   * @return $this
    +   */
    +  public function setQueueId($queue_id) {
    +    $this->queueId = $queue_id;
    +    return $this;
    +  }
    ...
    +  /**
    +   * Sets the worker ID.
    +   *
    +   * @param string $worker_id
    +   *   The worker ID.
    +   *
    +   * @return $this
    +   */
    +  public function setWorkerId($worker_id) {
    +    $this->workerId = $worker_id;
    +    return $this;
    +  }
    ...
    +  /**
    +   * Sets the job payload.
    +   *
    +   * @param array $payload
    +   *   The job payload.
    +   *
    +   * @return $this
    +   */
    +  public function setPayload(array $payload) {
    +    $this->payload = $payload;
    +    return $this;
    +  }
    

    So, at first I thought this was a dumb data object, but you have setters on it, which given your static factory, I guess I see the need for. I was sort of expecting the object to be immutable other than maybe state. Guess I'll be looking to the rest of the code to decipher the "why" here.

  10. +++ b/src/JobResult.php
    @@ -0,0 +1,83 @@
    +  /**
    +   * Constructs a new JobResult object.
    +   *
    +   * @param string $state
    +   *   The state. Job::STATE_SUCCESS or JOB::STATE_FAILURE.
    +   * @param string $message
    +   *   The message. Optional.
    +   */
    +  public function __construct($state, $message = '') {
    +    $this->state = $state;
    +    $this->message = $message;
    +  }
    +
    +  /**
    +   * Constructs a success result.
    +   *
    +   * @param string $message
    +   *   The message. Optional.
    +   *
    +   * @return static
    +   */
    +  public static function success($message = '') {
    +    return new static(Job::STATE_SUCCESS, $message);
    +  }
    +
    +  /**
    +   * Constructs a failure result.
    +   *
    +   * @param string $message
    +   *   The message. Optional.
    +   *
    +   * @return static
    +   */
    +  public static function failed($message = '') {
    +    return new static(Job::STATE_FAILURE, $message);
    +  }
    

    My initial expectation was "why not just pass a Job to this class and let it do stuff", but then I saw the static factory methods and now I'm really confused as to this things' purpose.

  11. +++ b/src/Plugin/AdvancedQueue/Backend/BackendBase.php
    @@ -0,0 +1,141 @@
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function calculateDependencies() {
    +    return [];
    +  }
    +
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function getConfiguration() {
    +    return $this->configuration;
    +  }
    +
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function setConfiguration(array $configuration) {
    +    $this->configuration = NestedArray::mergeDeep($this->defaultConfiguration(), $configuration);
    +  }
    +
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function defaultConfiguration() {
    +    return [
    +      'lease_time' => 300,
    +    ];
    +  }
    

    Might consider extending ConfigurablePlugin rather than PluginBase. I see your interface already extends ConfigurablePluginInterface. It'd allow you to remove at least 2 of these method implementations.

  12. +++ b/src/Plugin/AdvancedQueue/Backend/Database.php
    @@ -0,0 +1,258 @@
    +  public function deleteQueue() {
    +    // Delete all jobs in the current queue.
    +    $this->connection->delete('advancedqueue')
    +      ->condition('queue_id', $this->queueId)
    +      ->execute();
    +  }
    

    Shouldn't we return some sort of boolean or something so we know if it was successful? That question goes for all these methods.

  13. +++ b/src/Plugin/AdvancedQueue/Backend/Database.php
    @@ -0,0 +1,258 @@
    +  public function enqueueJob(Job $job, $delay = 0) {
    +    $this->enqueueJobs([$job], $delay);
    +  }
    +
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function enqueueJobs(array $jobs, $delay = 0) {
    +    if (count($jobs) > 1) {
    +      // Make the inserts atomic, and improve performance on certain engines.
    +      $transaction = $this->connection->startTransaction();
    +    }
    +
    +    /** @var \Drupal\advancedqueue\Job $job */
    +    foreach ($jobs as $job) {
    +      $job->setQueueId($this->queueId);
    +      $job->setState(Job::STATE_QUEUED);
    +      $job->setAvailableTime($this->time->getCurrentTime() + $delay);
    +
    +      $fields = $job->toArray();
    +      unset($fields['id']);
    +      $fields['payload'] = serialize($fields['payload']);
    +      // InsertQuery supports inserting multiple rows at once, which is faster,
    +      // but that doesn't give us the inserted job IDs.
    +      $query = $this->connection->insert('advancedqueue')->fields($fields);
    +      $job_id = $query->execute();
    +
    +      $job->setId($job_id);
    +    }
    +  }
    

    These methods are all super silent... are we just depending on thrown exceptions?

  14. +++ b/src/Plugin/AdvancedQueue/Backend/Database.php
    @@ -0,0 +1,258 @@
    +        'payload' => serialize($job->getPayload()),
    

    This is the spot I was suggesting we should do a json_encode()... also fwiw, I think Drupal does the serialization for us these days when the schema says to do it, doesn't it?

  15. +++ b/src/Plugin/AdvancedQueue/Backend/SupportsListingJobsInterface.php
    @@ -0,0 +1,13 @@
    +interface SupportsListingJobsInterface {
    +
    +}
    

    Corresponding method(s)?

  16. +++ b/src/Plugin/AdvancedQueue/Worker/WorkerBase.php
    @@ -0,0 +1,33 @@
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function getMaxRetries() {
    +    return $this->pluginDefinition['max_retries'];
    +  }
    +
    +  /**
    +   * {@inheritdoc}
    +   */
    +  public function getRetryDelay() {
    +    return $this->pluginDefinition['retry_delay'];
    +  }
    

    Are these really defined at the plugin definition level? or are they configurable?

  17. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +    if ($processing_time == 0 && PHP_SAPI != 'cli') {
    +      $processing_time = 90;
    +    }
    

    comment would be nice

  18. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +    // Update the job with the result.
    +    $job->setState($result->getState());
    +    $job->setMessage($result->getMessage());
    

    Ok, seeing this isn't an immutable data object at this point. I guess I'm sort of wondering if this sort of information doesn't belong to the JobResult object instead? I know I'm being a bit of a pedant here, so feel free to ignore me. That being said, there's something appealing in my head about the Job being immutable and the JobResult being the object that a.) contains a job, and b.) the results of that job and any pertinent information.

  19. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +    if ($job->getState() == Job::STATE_SUCCESS) {
    ...
    +    elseif ($result->getState() == Job::STATE_FAILURE) {
    +      // The job will be retried if the worker allows it, as long as
    

    Ok, so we're checking the job in once case, and the JobResult in the other... wat?

  20. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +      // The job will be retried if the worker allows it, as long as
    +      // the reason for the failure is not an exception.
    

    && FALSE?

    Also "reason for failure" seems like the sort of information we could track at the JobResult level since we're manually creating a JobResult on any exception, so we'd have that information at that time.

  21. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +  protected function getWorker($worker_id) {
    +    if (!isset($this->workers[$worker_id])) {
    +      $this->workers[$worker_id] = $this->workerManager->createInstance($worker_id);
    +    }
    +
    +    return $this->workers[$worker_id];
    

    This looks alarmingly like a PluginCollection pattern. I've tried to fly w/o a collection about a billion times and it's almost always ended up biting me. FWIW.

  22. +++ b/src/Processor.php
    @@ -0,0 +1,151 @@
    +    watchdog_exception('cron', $exception);
    

    do you mean 'cron' here? Just seems out of place.

  23. +++ b/src/ProcessorInterface.php
    @@ -0,0 +1,33 @@
    +  /**
    +   * Processes the given queue.
    +   *
    +   * Jobs will be claimed and processed one by one until the configured
    +   * processing time ($queue->getProcessingTime()) passes.
    +   *
    +   * @param \Drupal\advancedqueue\Entity\QueueInterface $queue
    +   *   The queue.
    +   */
    +  public function processQueue(QueueInterface $queue);
    +
    +  /**
    +   * Processes the given job.
    +   *
    +   * @param \Drupal\advancedqueue\Job $job
    +   *   The job.
    +   * @param \Drupal\advancedqueue\Entity\QueueInterface $queue
    +   *   The parent queue.
    +   */
    +  public function processJob(Job $job, QueueInterface $queue);
    

    I've noticed a lot of methods don't have returns. That seems weird to me.

  24. +++ b/src/QueueAccessControlHandler.php
    @@ -0,0 +1,32 @@
    +    $admin_permission = $entity->getEntityType()->getAdminPermission();
    

    ok, so no nuance on the permissions? Just admin of the whole entity type?

  25. +++ /dev/null
    @@ -1,180 +0,0 @@
    diff --git a/src/Runner/ConsoleQueueRunner.php b/src/Runner/ConsoleQueueRunner.php
    
    +++ b/src/Runner/ConsoleQueueRunner.php
    @@ -0,0 +1,34 @@
    diff --git a/src/Runner/DrushQueueRunner.php b/src/Runner/DrushQueueRunner.php
    

    YAY! Everyone can run queues. Nice!

I didn't review the test coverage. I agree with Daniel, if this is working, I didn't see anything to prevent you from committing it and moving forward. If any of my comments resonated though, it could be worth doing them first, but only if you felt the same way as I did. If you like the state of the code otherwise, it looks pretty good to me. I didn't apply it, I just reviewed it in a vacuum.

Eclipse

  • bojanz committed 230df55 on 8.x-1.x
    Issue #2913344 by dawehner, bojanz, EclipseGc, borisson_: Start a well...
bojanz’s picture

Status: Needs review » Fixed

Commit includes:
https://github.com/bojanz/advancedqueue/commit/70e90136b62723926cb941d73...
https://github.com/bojanz/advancedqueue/commit/e96d35b00910af692abc08802...
https://github.com/bojanz/advancedqueue/commit/62c29a942861217e849840608...
https://github.com/bojanz/advancedqueue/commit/82eda53f83b88f9003bc4ed41...

(README, json payload, QueueWorker -> JobType, nits).

I've also amended a portion of the Drupal Console command work.

See you in followups!

Note: Forgot to include fgm in the commit message, sorry about that.

neclimdul’s picture

Dang you guys moved fast... I only found out about this a week ago and hadn't even had a chance to review the latest patches. Sorry.

bojanz’s picture

@neclimdul Reviews are still (and always) welcome, nothing is set in stone yet.

Status: Fixed » Closed (fixed)

Automatically closed - issue fixed for 2 weeks with no activity.