Dries asked for a queue module. This module allows you to queue an item and dequeue it. Two workers, even if they hit the database at the very same time can not dequeue the same item. Multiple
queues are supported. Queue cleanup and priorities are not yet there -- there is always a next, patch. We have a pluggable, documented and tested architecture here.

CommentFileSizeAuthor
#134 391340-134.job_queue.patch1.17 KBdww
#127 391340-127.job_queue.patch18.61 KBdww
#125 391340-124.job_queue.patch17.75 KBneclimdul
#122 391340-122.job_queue.patch19.03 KBneclimdul
#116 391340-116.job_queue.patch19.41 KBdww
#114 391340-114.job_queue.patch19.4 KBdww
#113 391340-112.job_queue.patch17.94 KBneclimdul
#106 391340-106.job_queue.patch17.36 KBalex_b
#104 391340-104.job_queue.patch17.18 KBdww
#102 391340-102.job_queue.patch17.18 KBdww
#100 391340-99.job_queue.patch17.25 KBdww
#96 391340-96.job_queue.patch17.04 KBalex_b
#94 391340-94.job_queue.patch17.09 KBdww
#90 391340-90.job_queue.patch16.27 KBneclimdul
#88 391340-88.job_queue.patch16.32 KBneclimdul
#84 391340-81.job_queue.patch15.53 KBneclimdul
#79 391340-79.job_queue.patch18.65 KBdww
#74 391340-73.job_queue.patch15.58 KBneclimdul
#65 391340-65.job_queue.patch16.56 KBdww
#64 job_queue.patch16.58 KBboombatower
#63 job_queue.patch16.11 KBboombatower
#62 job_queue.patch15.53 KBchx
#60 job_queue.patch15.49 KBchx
#58 job_queue.patch15.71 KBchx
#48 job_queue.patch14.11 KBalex_b
#45 job_queue.patch13.88 KBboombatower
#43 job_queue.patch13.76 KBboombatower
#42 job_queue.patch12.88 KBchx
#39 job_queue.patch12.88 KBchx
#34 job_queue.patch9.77 KBchx
#32 job_queue.patch7.62 KBchx
#31 job_queue.patch7.61 KBchx
#26 job_queue.patch7.62 KBchx
#20 job_queue.patch7.9 KBchx
#16 job_queue.patch9.87 KBchx
#8 job_queue.patch11.1 KBchx
#6 job_queue.patch11.06 KBchx
#2 job_queue.patch11.99 KBchx
#1 job_queue.patch9.47 KBchx
Support from Acquia helps fund testing for Drupal Acquia logo

Comments

chx’s picture

FileSize
9.47 KB
chx’s picture

FileSize
11.99 KB

Opsie, forgot to queue.inc.

boombatower’s picture

If we are looking for performance does it make sense to use usleep()?

Otherwise it looks good.

Damien Tournoud’s picture

Interesting start. If I understand correctly, this is more a "architecture needs review" then "code needs review".

For this to reach the code needs review state, we will need at least (1) to implement a producer of items to queue and a consumer of queued items and (2) to extend the test cases (that are only very partial at this time).

In a nutshell, I like the approach.

chx’s picture

We wanted an API... I can write more tests for sure but i have no clue how to test the "two workers at the same time can't dequeue the same item"

chx’s picture

FileSize
11.06 KB

Removed the innodb patch that got in here accidentally.

neclimdul’s picture

I'm a bit excited about the possibilities this patch could open up. Especially the code it will take out of contrib. I'm really behind this patch though is the sort of batch processing(not really the batch stuff we have now though it could possibly use that too I guess) it opens up.

I'm not sure any of those implementations would do anything but muddy this api discussion though so I can't agree with Damien on that assessment though. Unless maybe he has something in mind...

One possible implementation I see, that definitely falls in the muddying category, would be cron? I'm sure I'm not the only one that had a cron that wanted to do more than one task. Or wanted to write a module that allowed for more fine grained control of cron. Or wanted to write a module that allowed for more fine grained control of cron and could break out different parts of different cron hooks... or have different cron consumers to break out expensive tasks among more than one server? It could be a really powerful re-visioning of hook_cron that would open up a lot of neat contrib possibilities in my mind.

chx’s picture

FileSize
11.1 KB

lyrincz found an edge case, now we are protecting against double updates.

Everyone jumps on cron when seeing this patch, understandable, but that's a big bad kitten killing monster. Meow :)

andrewlevine’s picture

I'm excited about this patch too. I've been working on a similar type of thing at work ( http://cvs.drupal.org/viewvc.py/drupal/contributions/modules/datasync/da... ) that is nowhere near generalized enough for core but it brings up the question:

What exactly is the use case for this job queue that we are optimizing for? Why must it be a queue, why not a "scheduler"? Why in this patch is there only one queue implementation per-site, rather then per queue?

I'm not suggesting that anything in this patch necessarily needs changing, just that we need to think about what the job queue will be used for before we can judge this patch.

Anyone interested in meeting up at Drupalcon DC to go over this?

neclimdul’s picture

Status: Needs review » Needs work

Install failed, "Specified key was too long." Is there any reason we can't use an int constant for the status like other places?

lyricnz’s picture

As discussed last night:
- please reroll without the whitespace changes at the top
- the phpdoc could use some work - extra full-stops in the middle of sentences, complete sentences etc. Happy to review in person! (no point in supplying changes here, when you're still working on it)
- make the use of $item consistent - sometimes it means the data that goes in the queue, sometimes it means the queue entry record (which contains the data)
- function score() could be more stringent, checking for exact matches for each term in the two lists. Right now [1,2,3] [1,1,1] will return 3 :) Also, in this (single threaded, non-pausing), should we check for ordering too?

neclimdul’s picture

Finish is also broken(and not tested). It writes a record without providing an update key so it creates an additional entry instead of updating the table.

chx’s picture

@lyrincz, just a quick remark on scoring, [1,1,1,1] will fail when scoring against itself, it will score 16. For the rest, I will change the data column back to item, change the dequeue docs to say it returns a queue entry. Change finish() to only take the item_id and not write back the data rather delete the row. Change dequeue() to have a $wait and a $processing_timeout and accordingly change the timestamp column to a timeout one. Add a queue_cron which resets the items where $processing_timeout has been passed (ie process crashed). I think a number of retries column is also useful and then an ERROR state as well. But then what do we do with items that got into ERROR state?

About ordering, given that a) several worker machines can access the queue which can very different in speeds b) in case a process caches there is a timeout before it can be reprocessed -- there can be absolutely no guarantee on when an item processing actually happens so I think any sort of ordering is unnecessary and just creates an illusion. We can, of course, add priorities and document it to be best effort, or make an option to work as LIFO or FIFO etc etc. Not sure it worths the bothering given that the implementation is pluggable.

boombatower’s picture

This is extremely minor, but do we want to sleep for a full second?

chx’s picture

Yes. Why not? if there is nothing in the queue then yeah, otherwise you get a busy wait scenario.

chx’s picture

Status: Needs work » Needs review
FileSize
9.87 KB

Another one. status is so fixed it's gone :) comments fixed too.

Jaza’s picture

  • @Damien Tournoud: I agree with #4. Chx, we need a description of at least one implementation that you have planned for this in core. Even better, please write a separate patch (and link to a separate issue) that contains the code of such an implementation - then we can test this patch and the implementation patch together. This will mean that (a) people like me have a better idea of what you can do with the queue module (from looking at your patch, I couldn't say - maybe you have cron task queueing in mind? just a wild guess), and so that (b) we have something tangible with which to test the queue module.
  • We also need to think about dependency. Clearly, this module is designed for other modules to build upon. But it's also pluggable, so a module can be built to depend on "the queue module or any other module that implements the Queue API". How can this be defined in the .info dependency system? Possibly, the dependency system will need to be improved to support this paradigm.
  • With the current patch, the queue module will become the most mysterious module for newbies looking at the module admin page. "Name: Queue. Description: Default queue implementation." This is not an acceptable description - even intermediate/advanced users will look at this and say: "WTF is a default queue implementation?!?! Do I need this module?" Perhaps a better description would be: "Provides a simple queueing service for use by other modules." The words "for use by other modules" should make it clear to non-tech users that they don't need to worry about it, and that they should just leave it enabled and not touch it.
kbahey’s picture

Status: Needs review » Needs work

Ok, here are the comments:

1. Doxygen fix: Instead of "Queue an item.in a queue." use "Queue an item to a queue."

2. Doxygen fix: "How long the processing is expected to take in seconds, Defaults to 30." This does not say what will happen if the processing exceeds that number of seconds. Would kittens be tortured or what?

3. How do people use the DrupalQueue class if they want to queue something? This needs to be documented somewhere as an API call. I mean, if I want to run a certain function with certain arguments, I can't find an obvious way to do that.

4. How do people queue something to run at cron time (like job_queue) does. Need to be documented too.

5. Is there an ability execute a queued "thing" once only? That is, prevent duplicates? For the existing job_queue module, we have this via the last parameter: job_queue_add('my_function', 'some description', array(), NULL, TRUE);. this is very useful when I want to queue something and if it is already queued, I don't want to do it again, just once.

kbahey’s picture

One more thing: Is there a way to see a list of how many queued items are pending and what their descriptions are? Again, job_queue does that. Would be nice to have.

chx’s picture

Status: Needs work » Needs review
FileSize
7.9 KB

Jaza, khalid thanks, a few comments about those things I did not do. I am not providing a use case aside from Dries asked for this in his keynote, so it's needed. I presume some elaborate cron system rehaul can use this. I am not particularly interested doing this on a core cleanliness level. Why I should? I am providing an API Dries asked for.

Duplicates? This whole module is about stopping one item being dequeued by two workers.

boombatower’s picture

#17: the dependency issue should be fine. If a module uses the queue API then they put it as a dependency in their info file...if a module implements a backend for it then the queue doesn't depend on it...as the queue only needs the default implementation. Any additional implementation are optional.

Obviously if you enable the optional backend and not the queue...nothing will happen...but I don't think it is our job to check for "dumb configurations".

boombatower’s picture

The code looks great to me...we may change stuff as we go, but I think this is a great base API. I still wonder if it makes sense to use usleep() instead of sleep().

neclimdul’s picture

I don't wonder about it. I feel pretty strongly that it should be configurable and tunable per site. I also don't think its really important to reviewing this patch as its not something that really affects any reviews. Its just sidetracking the discussion.

neclimdul’s picture

Re-reading that it sounded kinda rude. Sorry it was a crappy flight and didn't mean to be that dismissive.

Dries’s picture

- I'm not a big fan of making this a module. We're not going to create modules for every data structure or framework. It should probably be part of system.module.

- I prefer push() and pop() over queue() and dequeue(). Or, more abstract addJob() and getJob().

- I agree that it would be good to have one example of the queue -- as a separate patch. For example, it is not clear how I schedule a recurring task. For example, how do I adjust watchdog module to prune the table every 10 minutes?

chx’s picture

FileSize
7.62 KB

Here is a bit modified patch. Sleeping is gone. The new methods are add/reserve/delete. This is a module because we want to be able to use Amazon SQS, beanstalkd and whatnot. This is not a job scheduler, this is a queue. I could add a "when to run" field and a callback but then we can not use an alternative implementation. I still do not think a scheduler is appropriate here, honestly. Pruning the watchdog every ten minutes is simply a cronjob. What we could do is, for example, change aggregator_cron to add the items that need updating the queue (fast) and then change the update process to run only for a specified (short) amount of time. That'd be the new cron framework...

alex_b’s picture

I could add a "when to run" field and a callback but then we can not use an alternative implementation.

Is this the only reason why we don't want to add cron worker functionality to queue module? You hint at something far more serious in #8 (kittens and such) - so I am not sure...

However, I feel this job queue patch is so close to a job scheduler that I can't resist to play out how we could add job scheduling functionality while not hampering flexibility:

In the first place, I can't think of a common case when it would make sense to replace all queue handling with an alternative module:

function _queue_get_queue($queue_name) {
  static $queues;
  if (!isset($queues[$queue_name])) {
    $class = variable_get('queue_module', 'queue') . 'Queue';
    $queues[$queue_name] = new $class($queue_name);
  }
  return $queues[$queue_name];
}

Wouldn't it be much more common that a queue user wanted to replace queue handling on a queue per queue basis? _queue_get_queue() could allow per-queue handler modules like this:

function _queue_get_queue($queue_name) {
  static $queues;
  if (!isset($queues[$queue_name])) {
    $class = variable_get('queue_module_' . $queue_name, 'queue') . 'Queue';
    $queues[$queue_name] = new $class($queue_name);
  }
  return $queues[$queue_name];
}

Once we can switch out the queue handler on a queue by queue basis, it wouldn't hurt anymore to add a light cron worker to queue.module - alternative queue modules would replace functionality only for specific queues, this would allow them to have some implicit knowledge of which callbacks occur and to handle them properly.

queue_add() would take optional $callback and $period parameters. queue_cron() would work off queued items with a callback value according to the set time periods. This would allow API users to still use their own workers.

function queue_add($queue_name, $item, $callback = NULL, $period = 0) {
  return _queue_get_queue($queue_name)->queue($item, $callback, $period);
}

Seen from an aggregator perspective, only job scheduling would make the queue module really useful: It would lay ground to solving the problem of cron timeouts between heavy tasks like aggregator_cron() and search_cron() because we could time jobs centrally in queue_cron(). Also moving jobs to external queues would become easy: just write a queue handler for your queue and handle specific tasks outside of drupal (e. g. fetching feeds, parsing).

chx’s picture

While people desperately want to solve cron hic et nunc , I have a different problem in my mind: someone uploads crap that takes long to process. And then another million user does the same. Say, upload media. That requires a real queue (like beanstalkd or SQS) and an army of backend workers. Now, how do we use a queue as a job scheduler? We are not really interested in a hard scheduler, a best effort scheduler would do. So we want that a specific amount of time to pass between the two runs of a job (typical example: fetch something -- we do not want to pound the remote server). Now, every queue supports visibility timeouts even mine supports a process_time. But that's something we need to specify when we ask for a job, it cant be inside the job itself. However, just how many unique time differences can be there, really? A dozen? So then the cron system would create drupal_cronjobs_300 and then we do know the TTL: 300 seconds.

alex_b’s picture

#28 - thanks for clarification. You clearly outline a valid use case for a pure queue. I'll follow up with a job scheduler patch on a separate issue...

alex_b’s picture

I created a follow up issue for a job scheduler based on queues here: #410656: Job scheduler

chx’s picture

FileSize
7.61 KB

I removed a stray $timestamp from add, otherwise no change (literally, i edited the very patch file)

chx’s picture

FileSize
7.62 KB

Added Alex's per queue name queue engines. Still achievable through editing the patch :)

alex_b’s picture

Status: Needs review » Needs work

I did another review and took a closer look at the code:

* Comment of dequeue() in DrupalQueue interface refers to non-existant function queue() (should be add())
* queueQueue implements reserve(), the interface still declares dequeue() and queue_reserve() calls ->dequeue()
* queue_delete() calls non-existant ->finish()
* CVS Id in files missing

chx’s picture

Status: Needs work » Needs review
FileSize
9.77 KB

The test got lost from the later patches, that caused all these interface/implementation problems. Now it's back, passes, the comments and IDs are fixed.

drumm’s picture

I think the queue should be part of system module instead of having yet another module. A queue should work transparently for users, they do now even need to know it is there, so no need for a module. The basic API should always be there for developers, so no need for a module.

I designed the job queue contributed module a bit differently. Queued items are a function name, (serialized) argument array, and human-readable name. I think it is important to have a human-readable name for any UI that is exposed. Dequeuing a item is calling the function with arguments. I resisted doing multiple queues and proiritization for simplicity, but priorities can be set per-function-name. There is an option to not add duplicates to the queue.

This should be used for
* Search indexing. There should never be an option for "Process X items per cron run," the computer should just do as many as it can without asking. The current date-based variable has proven flaky when importing data or doing other things outside the standard node editing.
* Aggregator parsing. The cron job should queue feed updates instead of doing them, taking care to never double-queue anything. This lets the queue manage server resources and not run out of execution time or memory if there is a cluster of feed updates at the same time.

chx’s picture

drumm, you want the job scheduler issue. Thanks.

drumm’s picture

chx informed me that my functionality request belongs in #410656: Job scheduler, where it has been added.

There is no UI, which is fine for me. I think users will ask to see what is in the queue, but that can be in devel or elsewhere. Regular users should have a queue that just works. If it just works, seeing the specifics is not necessary. If there were a UI, I would want to see human-readable names for everything.

And the module is just there for hook_cron() and should be able to be turned off if there is a contributed queue-handling module. It is a shame the user has to make a decision on the module admin page about something as internal as a queuing system, but this is how it is. Another API to clean up swappable system handling like this, logging, and caching is another issue.

alex_b’s picture

Status: Needs review » Needs work

* queue_delete() accepts a $entry while the drupalQueue interface and the queueQueue class' implementation declare a $item_id.
* How would one remove all queued items from a queue when e. g. a module that entered the items is uninstalled? Do we need a queue_delete_queue($queue_name) ?

chx’s picture

Status: Needs work » Needs review
FileSize
12.88 KB

Lots of progress! Aside from fixing the small issue w/ item removal, I did move the whole codebase under system -- into one file, called system.queue.inc. This way, we always have the queue functionality at hand but neither the queue interface nor the default implementation takes up memory in the typical case. Added create and remove queue functionality, too.

drumm, there is no facility to peek in the queue in Amazon SQS and I do not think that's a feasible request either. Of course, devel can run a query against the default implementation and that's about it.

Status: Needs review » Needs work

The last submitted patch failed testing.

chx’s picture

No time to reroll, surely an update function got committed.

chx’s picture

Status: Needs work » Needs review
FileSize
12.88 KB

Edited patch file really quick.

boombatower’s picture

FileSize
13.76 KB

Updated the documentation and remove return values from deleteItem, createQueue, and removeQueue.

boombatower’s picture

I have a usecase for this in #323477: Increase simpletest speed by running on a simplified profile.

I am fixing a number of problems and adding a few features while making it all run faster, but to do so I need to reuse test database prefixes. Since the tests can be run concurrently I need to be able to queue the prefixes that a free and reserver them when the next test attempts to run. The queue implementation deals with the concurrency issue so this would be a great usecase.

Another idea would be to queue all tests and then both the web test runner and the script could run tests concurrently without requiring pcntl_fork().

boombatower’s picture

Status: Needs review » Reviewed & tested by the community
FileSize
13.88 KB

Moved test to system.test since queue is apart of system module.

Talked with chx in IRC and after looking it over it is read for RTBC.

boombatower’s picture

Title: Job queue module » Job queue API
dmitrig01’s picture

Could we change the default queue? That way if someone does queues better and they want to do it for all queues currently they can't

alex_b’s picture

FileSize
14.11 KB

Reviewed and found some problems that I could fix:

  • Went through comments and made references to other functions consistent.
  • Added return value to update hook.
  • Item expiry was broken. Changed system_cron() query from
  db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->condition('process_id', 0, '>');

to

  db_update('queue')->fields(array('process_id' => 0, 'expire' => 0))->condition('expire', time(), '<')->execute();

I found two issues that I didn't fix right away because I'm not sure whether there's a consensus for them:

1)
The comment on system_queue_reserve_item() says: "This queue entry needs to be passed to system_queue_delete_item() once processing is completed."

This would mean that letting items expire on purpose (e. g. for scheduling recurring jobs) is against the specification. Shouldn't we rephrase this line to:

"Pass this queue entry to system_queue_delete_item() once processing is completed if it shouldn't be processed again."

?

2)

Shouldn't system_queue_add_item() return $entry so that an API user could keep track of specific items on the queue? E. g. if system_queue_add_item() returns a $entry, an API user could remove a specific item from the queue no matter whether it was processed or not.

chx’s picture

"This queue entry needs to be passed to system_queue_delete_item() once processing is completed." -- and that's right. We can refine this when the job scheduler comes but with the scheduler , the processing is not complete... let's refine this there.

The return of add is not really meaningful.

In versions of SQS prior to 2008-01-01, each message received a system-assigned identifier that you needed in order to delete the message from the queue. SQS still returns this message ID to you in the SendMessage response, but you can no longer use the message ID to delete the message. Instead you need a receipt handle.
...
Each time you receive a message from a queue, you receive a receipt handle for that message.

the reason I am using Amazon SQS here is because they faced with scalability issues few did. So you can not jump directly from add to remove. That does not work.

chx’s picture

@Dmitri, we can change that in a followup. Unless, of course, handlers get in and then we do not need to do this dance...

alex_b’s picture

Just chatted with chx on IM. I agree with #49

1) is really minor. 2) system_queue_add_item() can't return a reliable identifier if we want to have ideal scalability.

alex_b’s picture

This is RTBC from my point of view.

chx’s picture

What can I do to further this patch?

boombatower’s picture

/me wonders same

Dries’s picture

Status: Reviewed & tested by the community » Needs work

This is starting to look good, but ...

- In system_update_7022(), can't we simply call drupal_install_schema('queue')? I guess it might be a problem if our declaration of the queue schema changes.

- system_cron() execute a query to reset the queue. Shouldn't that be an API call instead of a query? What if I have a queue implementation that wants to do something similar?

- The documentation of system_queue_reserve_item() does not explain why you would want to reserve an item. I think it needs to be documented better. In fact, I have a hard time grokking the design. Ideally, there would be a some documentation that explains the bigger picture, the difference between 'add' and 'reserve', etc.

- I don't understand why we need createQueue() if there is a constructor. (If we can remove createQueue(), I think we should rename removeQueue() to emptyQueue() or something.)

- testQueue() needs code comments. It is very cramped and undocumented.

- I think having to call module_invoke('system', 'queue_add_item', $queue1, $items[0]) is somewhat messy. I'd rather see us call system_queue_add_item() directly.

- I don't think the notion of $process_time is very portable. We can't really make assumptions about the performance of the underlying system. It is unclear how we can chose meaningful values for this.

chx’s picture

Some non-patch answers: no, we do not use that in the upgrade functions. Ask Barry why. I just remember we don't.

We need createQueue() because that's an install time operation, the constructor is ran every time you touch the queue.

You might not like module_invoke and I am ready to admit it's not ideal but it allows us not to have queue in the memory all the time and it's really not needed, the registry loads it this way.

Every queue implementation uses the notion of $process_time. We must have it, in case a process dies or hangs or something while processing then another process needs to be able to take over.

I will fix the rest, they are very easy.

yched’s picture

Update functions don't refer to the schema definitions because you can't be sure of what the schema will be by the time the update is run. If a later patch alters the schema, the update function will put the db in a state that's different that the one intended at the time it was written.

chx’s picture

Status: Needs work » Needs review
FileSize
15.71 KB

- system_cron() execute a query to reset the queue. Indeed it does. It's a system module thing, it implements system_cron. If you write fancy_queue.module then fancy_queue_cron.module will do for you.

With that, we only needed better documentation, written. Also, thx to David Strauss, reduced the query count to 2 from 3.

Berdir’s picture

I'm not sure about this, so I'm not setting this to needs work..

+    $statement->range(0, 1);

According to Crell, it's not necessary to use db_select() for range queries: http://drupal.org/node/394484#comment-1472300

- node_page_view() has a db_select() in it that I don't think needs to be. db_query_range() still works the same as before, and there's no tagging, so using a static query will be faster.

chx’s picture

FileSize
15.49 KB

You are right, in an older version it was in a loop so the statement was cheaper. But, it's a static query now.

Status: Needs review » Needs work

The last submitted patch failed testing.

chx’s picture

Status: Needs work » Needs review
FileSize
15.53 KB

Sniff.

boombatower’s picture

FileSize
16.11 KB

Made a few changes.

Before a queue can be used ti needs to be created by -> Before a queue can be used it needs to be created by

These functions needs -> These functions need

How long the processing is expected to take in seconds, Defaults to -> How long the processing is expected to take in seconds, defaults to (twice)

Some comment formatting issues.

boombatower’s picture

Status: Needs review » Reviewed & tested by the community
FileSize
16.58 KB

Added " for processing." to "Reserve an item in a queue for processing." on system_queue_reserve_item() per #55

Added additional documentation to createQueue() and removeQueue() and copied to related functions.

Cleaned up test case and comments.

I believe that deals with everything in #55.

dww’s picture

Status: Reviewed & tested by the community » Needs review
FileSize
16.56 KB

A) I fixed a bunch of typos in comments:
- s/the query will will be a no-op/the query will be a no-op/
- s/auto-incrament/auto-increment/
- s/While the queue system makes a best effort to preserve order in messages, but due to the pluggable nature of the queue, there is no guarantee that items will be delivered on reserve in the exact order they were sent./While the queue system makes a best effort to preserve order in messages, due to the pluggable nature of the queue, there is no guarantee that items will be delivered on reserve in the order they were sent./
- s/seconds, Defaults/seconds, defaults/ (again) ;)

B) In the "Queue operations" @defgroup, we get this gem: "The system makes sure only one process can process an item." -- ugh: overloaded use of "process". :( A little later when talking about processing items, we get "if the process dies..." Can we agree on not using the term "process" when referring to a thread that's consuming items from the queue? "Process" isn't even necessarily accurate from an OS point of view (since we might be consuming items via multiple threads in the same actual OS process), and it's certainly confusing as hell when talking about "processing" a queue. "Consumer" might be the best terminology for the thread/function/code/whatever that's processing items. So, how about:

"The system makes sure that only one consumer processes an item." ?

and

"If the consumer dies for whatever reason, the item becomes available again and another consumer can receive it when calling system_queue_reserve_item()."

C) yched in #57 explains exactly why you can't just use rely on the schema definition in the update function. However, is it really worth duplicating the 'description' fields in that table definition array we pass to db_create_table()? Those are totally ignored by db_create_table(), and anything looking up the descriptions of the schema are going to look in the hook_schema() implementation, not the DB udpates, right? I guess it doesn't hurt, but in my contribs, I never document the schema in the update functions, only the "real" schema in the hook_schema definition itself.

Otherwise, this looks great. As a long-time computer science researcher in job queue systems and distributed computing, I have no other objections. ;) Setting to needs review for the s/process/consumer/ terminology change from (B) and the possibility of ripping out the useless descriptions in the table definition array in system_update_7022() from (C).

Cheers,
-Derek

Dries’s picture

Thanks for the documentation updates. Much better!

- Now that I fully understand system_queue_reserve_item(), I wonder if it is better to rename it to system_queue_lock_item().

- "reserve" is an odd word that I don't usually encounter in CS context. It also suggest that you refrain from using an item, and that you retain it for future use. The reality that you want to use the object and that you want to take a lock on it so no other thread can use it or dispose it.

- I still don't understand why we need createQueue() when we have a constructor. If it initializes the queue, I think initialize() would be better than createQueue(). createQueue() suggests that we return a queue object, rather than initialize the current one.

(- Given that this is new functionality, we might want to do this more OO-y. Maybe use a simply factory class rather than wrapper functions? I think this would clean up the API a bit -- I can't help but think that createQueue() belongs in the factory class.)

dww’s picture

- "Lock" is also ok. It's shorter to type. However, I don't think it's fundamentally better than "reserve", and it carries its own connotations and possible (false) assumptions. E.g. if there's a "lock" operation in the API, where's the "unlock" operation? Oh, you don't need it, you just delete the item after you lock it. Slightly weird.

"Reserve" is not at all unheard of. "Resource reservations" are a major part of job scheduling (for example, after wading through one page of Google results for "resource reservation" talking about the RSVP networking protocol, I found Resource reservation prevents parallel job starvation). I won't push either way, but I don't think "lock" is a no-brainer improvement over "reserve" for what this is trying to convey...

Another possible term to throw into the mix, also heavily used in job scheduling: "claim". You can "claim" a task that you're going to consume, and you can "claim" a resource to use when you consume a task. Maybe the interface should define claimItem() and system can have a wrapper called system_queue_claim_item()?

- A job queue is a place to persistently store tasks you're trying to schedule. The constructor is invoked every time you instantiate an object, regardless of what operation(s) you want to perform on a job queue (push, pop, clear, whatever). In the default implementation in system.module, there's a single DB table to store all tasks, regardless of the queue. That might be really inefficient for some use-cases, and someone will want to partition the tasks into separate DB tables for each queue. In their implementation, createQueue() would have to create the new DB table for the new queue before you try to use it. Maybe you want to store your tasks directly on disk, and you want a new subdir for each queue -- createQueue() is where you'd call mkdir(). The only way the constructor could be considered the same as "createQueue()" is if you imagine a "persistent" job queue storage mechanism that exists only in RAM. ;) Does that clear up the difference? I'm not sure what more you want the documentation to say about this, but we can obviously add something if needed.

Dries’s picture

I agree that lock is not perfect. claim might actually be best.

However, I've been thinking about how I would use this API in practice, and I'm not entirely sure it is 100% solid. If thread A "claims" a job, and thread A dies "for whatever reason" (to quote the documentation), how would thread B know that thread A is dead vs still working on the job? I don't see how one would use the proposed API to properly and cleanly deal with such a scenario.

dww’s picture

Right. This isn't a real, full-fledged job scheduling system -- it's basically 50 lines of PHP code. ;)

The way to do that in the current API is via the $process_time argument to reserveItem(). If you reserve it for N seconds, and you're not done before that, the system thinks you're dead. Thread B assumes you're still working on it until $process_time expires, or you delete the task, whichever comes first. If $process_time expires and the item isn't deleted, the system assumes you're dead and gives it to someone else. Far from bullet-proof, and certainly possible to get into a situation where a job is given out to multiple consumers at once.

A real system might call this a "lease", and would give you a way to "renew" the lease if you're taking longer to complete something than you first thought you would. A really real system would have 2-way leases, and the resource would have to keep renewing its lease on the task, and the task would have to keep renewing the lease on the resource. If either lease expires, the other tries to kill off the other party, and returns the task to the queue. There are all sorts of ways to make this more bullet-proof for detecting failure, and transactional in terms of guaranteed semantics for the job queue.

But, how much do you want to bloat core for this? Complexity is supposed to be a disease, right? Is sending 2 emails to notify someone about the same comment in an extremely rare edge case (or running the same simpletest twice, etc) a more serious problem for the Drupal project than the added complexity of a 5000 line transactional job queue with 2-way leases and hooks to allow plugging in fancy resource scheduling algorithms? Just how bad is it if we can't guarantee that the tasks will definitely have run-only-once semantics? What's the greater evil: running the same job twice or not at all? The current system/API errs on the side of running it twice (or more) and only removing it from the queue when a consumer acknowledges that it completed the task and explicitly removes it from the queue. We can't reasonably make assumptions about if the tasks are idempotent or not -- even if thread A really did die, we don't know what steps in the task it might have already completed, which ones it needs to redo, undo, etc.

If you want to talk about how complicated this crap gets, check out my former day job (http://condorproject.org). I'll see you 1/2 million lines of C++ later. ;)

Meanwhile, I think the existing API is just fine. You claim a task for N seconds and if you haven't deleted it by then, all bets are off. At most, I'd consider adding a "renew" operation, but even that seems like potential bloat for little concrete value in the ways this API is likely to be used. When you claim a task, you just have to make a trade-off for the $process_time to use -- smaller values provide lower latency for idempotent tasks, higher values are better for non-idempotent tasks where higher latency in case of failure is a lesser evil than running more concurrently.

We can document the limitations of the semantics if we want to be more explicit about exactly what we are and aren't guaranteeing. Frankly, anyone with very specific, mission-critical transactional needs for their queued tasks should be implementing those semantics themselves with their own transactions of some sort. Even really fancy scheduling systems tend to punt on this point... It's really hard, at the generic job queue level, to make semantic guarantees about the internals of the tasks themselves.

chx’s picture

Dries, as the documentation says, the createQueue is before the first usage. The constructor runs on every queue usage. About reserve, see beanstalkd that's where the word is from.

andrewlevine’s picture

Speaking as someone who is working on a Job API/queue module for Drupal 6, the Queue interface and systemQueue implementation seems pretty solid and I couldn't find any reason why I wouldn't be able to integrate it in a Drupal 7 version. Great job guys!

One thing I wonder if we could clean up is having two layers of indirection before actually calling the queue methods (module_invoke() => system_queue_*() => $queue->*()). I understand we need module_invoke to use the registry and we need a function (not a class method) to use module_invoke, but certainly there must be a better way...I'll think on it.

Dries’s picture

dww, your explanation in #69 is great -- it is what I was thinking as well. However, it felt a bit brittle (and still does) because $process_time is exposed to the PHP application, not by the queue implementation. Different queue implementations can behave quite differently, and for some queue implementations $process_time could be much longer than for others. Maybe $process_time should be an 'internal' variable to the queue implementation?

Some of your explanation could be merged into the patch, if we'd like. Possibly, $process_time could be renamed to $lease_time, which is a bit more self-explanatory.

I think I'll be committing this patch later today as it is quite solid already. However, it is a good conversation to have, and if it results in a few small last-minute improvements, everyone wins.

neclimdul’s picture

Per a discussion with chx on IRC, move system_queue_foo_bar() to Queue::fooBar(). Removes the need for module_invoke as alevine mentioned and is nice and OO.

Also changed the no-op comment in system.module to "Reset expired items in the default queue implementation table. If that's not used will simply be a no-op." I think everyone will know a no-op doesn't cause any problems and its simpler grammatically.

neclimdul’s picture

FileSize
15.58 KB
neclimdul’s picture

PS - I actually like the claim wording but at the last second changed my mind about putting that in the above patch. I would be happy to re-roll with that wording if it was requested.

dww’s picture

@Dries: Sure, I'm glad we're talking about it -- getting the terminology and the API "right" is important.

Re: "$process_time is exposed to the PHP application, not by the queue implementation."

Only the consumer knows how long to expect it to take for them to complete a task. They presumably know (based on what queue it went in, since we have no other metadata associated with tasks) what kind of task it is and what they expect to do with it, and only the consumer at the time of requesting a claim/lease/reservation knows the current state of its available resources. You really need to know both to have an accurate guess for the duration to request. If you know you have on average a 30 second startup cost to an external service before you can process each task, as a consumer, it's up to you to add that to your estimate for how long you need your lease to last on your tasks. If you only have to do the startup cost once and then can maintain a connection open or something, but you need a task to establish the initial connection, only you can know about this and add 30 seconds for your initial request. Therefore, different consumer threads for the *same* queue (not to mention the same queue implementation) might need different $process_time values depending on their current state. The queue itself can't possibly know this: neither as the queue implementation nor as an argument to createQueue().

If the *queue* is implemented such a way that it might take a while to get a task back to give to a consumer, it's up to the queue to only start the "lease timer" once it has the task and is about to hand it back to the consumer -- the consumer certainly can't know about those delays, and that *should* be internal. But, I think the API is broken if the consumer can't specify a reservation time/lease duration/claim lifetime/whatever we call it at the time it's requesting a claim/lease/reservation. If you make this "internal", you break the API (from my POV).

So, a few concrete questions to be answered before this goes in:

B) Should we in fact ditch "process" in documentation referring to something working on a task, and adopt "consumer" (or some other term)? (same as the list from #65)

C) Should we leave or remove the 'description' elements in the table definition array in the schema update function?

D) Are we going with "reserve" or "claim"? (I think we all agree "lock" isn't what we're after).

E) Should we rename $process_time to $lease_time, $lease_duration, $claim_duration, $claim_lifetime, or leave it alone?

F) Should we add a lot more of the explanation about queue semantics from #69 to the @param doc (translated out of CS-jargon into something mere mortals can understand) for whatever we call (C)?

G) Do you want the difference between the constructor and createQueue() explained better in the PHPDoc for createQueue()?

Cheers,
-Derek

p.s. Yay for killing the system wrapper functions. ;)

Dries’s picture

- I like consumer better than process.

- I like lease time better than process time.

- Both reserve and claim work, although I have a slight preference over claim -- it is more natural to me.

dww’s picture

Eeek, the only bummer about s/process/consumer/ is that the schema uses "process", too: {queue}.process_id and {queue_process_id} itself. I guess we should fix those to be "consumer_id", too. I'm re-rolling based on this and #77...

dww’s picture

FileSize
18.65 KB

- Rerolled to incorporate the terminology changes from #77 (B, D, and E)
- Added more PHPDoc about the implications of the $lease_time parameter on queue semantics (F)
- Added more PHPDoc about createQueue() vs. constructor (G)

Didn't touch C, since I haven't heard anything more about that one.

At Crell's request, I also renamed the interface to "DrupalQueueInterface" to be more consistent with other OO-y parts of core, especially DBTNG.

Crell apparently hates the "Queue" class itself, but I'll leave that for his comments (and someone else to re-roll, probably).

Crell’s picture

Status: Needs review » Needs work

dww asked me to look into the docblocks for the OO parts of this patch. When I did so, however, I had to run screaming. The Queue class is wrong in innumerable ways. I shall enumerate only a few. :-)

1) A factory should never delegate commands. A factory creates, manages, and returns concrete objects that conform to an interface. That's it. Any other operations should be called directly on the concrete object that is returned. What this is doing now is nothing more than a procedural system that uses :: in place of _. That's a bad bad reason to use OO.

2) Interfaces should be named FooInterface, to make them more self-documenting and consistent with the other classes/interfaces we have in core. Docblocks should then be on the interface, and NOT on those classes that implement that interface. (That way we document all implementations in one central place, and don't need to update multiple times if things change.)

3) Dynamically named classes should not use a lowercase prefix. That makes them look not like classes. What the DB layer did is to use "BaseName_". $keyword, where $keyword is allowed to be lowercase. So QueryInsert_mysql. I admit I don't really like that approach, but it's the only one I know of that doesn't involve case munging, which is even worse/slower. Unless we can come up with a better way of handing that we should follow the DB layer's approach here for consistency.

4) The DB queries need to be broken out to multiple lines, per coding standards.

5) This line makes me scream in terror:

$class = variable_get('queue_module_'. $queue_name, 'system') . 'Queue';

By my read, that means every module can provide only one implementation based on the module name. That's an anti-pattern in Drupal that is horrifically bad, and needs to be exterminated where it exists now, not perpetuated.

Sorry, dww, you asked me to comment. :-)

Damien Tournoud’s picture

What this is doing now is nothing more than a procedural system that uses :: in place of _. That's a bad bad reason to use OO.

On the contrary, sing classes as a namespace is a perfectly legitimate way of using classes in PHP. It gives you nice autoloading, without having to ressort to ugly things like the fields "autoloader" (see #445044: Try to remove the field autoload "feature").

dww’s picture

@Crell #80.2: is already fixed in #79.

Crell’s picture

There are plenty of reasons for Fields to be full-on objects to begin with that have nothing to do with autoloading, but that's a separate issue.

neclimdul’s picture

FileSize
15.53 KB

Ok, re-rolled based on Crell's feedback. Obviously keeping a good OO approach is good and this more closely mirrors handlers giving us another place that can take advantage of that should it go in.

I only really addressed 1 and 4 and fixed a bug in deleteItem. It turns out it wasn't executing so no items where deleted. We didn't notice this because claimItem on a queue with all its items claimed will return false just like an empty queue would. hense, the empty queue test thought it was succeeding. I didn't not fix the test, that'll need to be looked at still.

I also did not address the docbloc statment in 2 or the object name issue brought up in 3.

I'm not sure 5 is true because the queue name is arbitrary so not tied to any module.

dww’s picture

@neclimdul: Cool, thanks. I repeat: Crell's #80.2 is already fixed (assuming you started from my patch in #79). Once we delete all the duplicate methods on the Queue class and let callers just invoke the methods on the object it returns, there's no duplicate PHPDoc -- the main comments are on the DrupalQueueInterface interface, and the systemQueue class (assuming it's still called that) doesn't have any comments.

chx’s picture

Status: Needs work » Needs review
neclimdul’s picture

Status: Needs review » Needs work

I think this actually still needs work. The test for making sure the deleteItem functionality works doesn't actually work.

neclimdul’s picture

Status: Needs work » Needs review
FileSize
16.32 KB

Alright, based on some discussion with chx on IRC, I've added a numberOfItems method to the API. I've documented the inaccuracy of this function for most purposes in the documentation.

Given a quiet system it gives us the ability to test deleteItem though. I've incorporated this into the tests so things should be good now.

With the working test I actually found yet another bug in deleteItem that was overlooked where it was expecting the item_id and we where passing it the item. Being as we pass items every where else and most of the rest of the documentation mentions passing in an item I've change the interface and implementation to do this.

dww’s picture

Status: Needs review » Needs work

Looking over all this again with a fresh(er) pair of eyes, a few new things caught my attention:

H) What's DrupalQueue::get($queue_name)->add() doing at the end of the @defgroup queue Queue operations doc block?

I) There's a stray "//" without an actual comment at the start of DrupalQueue::get()

J) Is it my imagination, or do we never collect garbage from the {queue_consumer_id} table? Seems like we add another row in there every time you ever instantiate a systemQueue object, and the table grows without bounds. That seems potentially scary. I mean, I know we've got {node} and {comment} and other nice tables that grow forever, but we actually still care about node/8, etc. ;) We're never going to care about the consumer_id after the specific systemQueue object is destroyed and there are no tasks associated with that id. We're just using that table to ensure unique consumer_id's for each systemQueue object, but we don't actually care about the history of all consumer_ids we ever used.

K) $start = time(); -- in D7, don't we want REQUEST_TIME for this?

L) Inside systemQueue::claimItem(): // If there are affected rows, this update succeeded. -- if not? We just return NULL, and the caller is probably going to assume there are no more tasks to claim. We don't actually document that claimItem() can return NULL in the PHPDoc for the DrupalQueueInterface, and what the caller should expect in this situation. I haven't carefully pondered systemQueue::claimItem() in depth, but it seems like we could easily return NULL during various race conditions (e.g. two threads happen to get the same claimId, both try to mark it with their own consumer_id, and only 1 succeeds). There are still plenty of tasks in the queue, but due to the race, we end up returning NULL. It's not a bug per se, but this should be better documented, both in the systemQueue implementation and in the interface itself. If you hit one of these race conditions, someone has to be responsible to retry. Do we want that to be the caller's problem, or should claimItem() itself loop and keep trying to claim tasks in these cases?

neclimdul’s picture

FileSize
16.27 KB

Hard questions there in J and L. Not sure how to answer them late at night but here's a quick fix for H, I, and K.

chx’s picture

Status: Needs work » Needs review

L is not a valid problem. There is no race. You can't get the same autoincrement twice. The whole point of this dance is to make sure there is no race condition. David Strauss checked it and he said too there is no race. Garbage collection IMO should be solved in #356074: Provide a sequences API

chx’s picture

Oh now I re-read and it's not the same problem you mention in L -- what I said is that two consumers cant get the same task. Now, if you check the original patch that was looping. That was removed because when there are no items it holds up the caller needlessly so now the caller should loop. Given that most of the time you will have a time slice you want to spend with processing items -- ie. not just one item -- you will loop anyways.

dww’s picture

Status: Needs review » Needs work

@neclimdul: Thanks for H, I and K. ;)

@chx: Right, L is a different race condition, but you re-read and understand. So, this needs work to document in the interface that claimItem() can return NULL and that it doesn't necessarily mean the queue is empty. I'm totally fine with those semantics, but we *need* to tell users of this API about it.

Re: J (garbage collection of consumer_id) -- if the answer is: "we should use #356074: Provide a sequences API for that" then IMHO, we should postpone this issue until that one lands. It seems lame to have multiple schema updates to add then remove the same table, etc. I'm not outright going to postpone this myself, but I really think the focus should be on getting #356074 in, then come back here and make this use db_next_id(). To that end, see #356074-21: Provide a sequences API. ;)

dww’s picture

Status: Needs work » Needs review
FileSize
17.09 KB

- Removed the following bogus text from the PHPDoc above the systemQueue implementation:

+ * Do not use this class directly, use the DrupalQueue::addItem(),
+ * DrupalQueue::claimItem(), DrupalQueue::deleteItem() and
+ * DrupalQueue::removeQueue() functions.

- Fixed the PHP doc for the @defgroup queue comment to refer to "DrupalQueueInterface::createQueue()" etc, instead of "DrupalQueue::createQueue()" (which doesn't exist). Not sure what the convention is for stuff like this, but "DrupalQueue::*" is definitely wrong.

- Fixed the PHPDoc for claimItem() to describe what a NULL return means (L).

However, it makes me wonder -- if the caller is supposed to loop, and claimItem() returning NULL shouldn't terminate the loop, what do we recommend?

  $queue = DrupalQueue::get('my_queue');
  while ($queue->numberOfItems()) {
    $item = $queue->claimItem();
    if (!empty($item)) {
      // do stuff ...
    }
  }

is that what we expect? numberOfItems() isn't cheap. :( I guess something like this?

  $queue = DrupalQueue::get('my_queue');
  while (TRUE) {
    $item = $queue->claimItem();
    if (!empty($item)) {
      // do stuff ...
    }
    elseif (!$queue->numberOfItems()) {
      break;
    }
  }

??? Definitely starts to seem clumsy at every call site. :(

This would obviously be a lot more natural:

  $queue = DrupalQueue::get('my_queue');
  while (($item = $queue->claimItem())) {
    // do stuff ...
  }

But, that requires changing claimItem() so that it only returns NULL if it's out of tasks, not if it hits the race condition where it couldn't claim the task it was trying to claim...

neclimdul’s picture

#356074 seems like something reasonable to wait for.

Thanks for the other fixes dww. Sorry, its seems that shifting the interface around messed up all of our docs and I wasn't searching for the right strings to catch it. Along those lines here's some more observations I made.

M) In the return documentation of claimItem we state that addItem and deleteItem take a queue entry object it returns. This is not what these methods are documented as taking or what the tests are sending. Should we go back to the entry or keep with the item? I don't think it makes any difference with our current implementation.

N) Also, maybe we should document these to objects separately with what minimum properties they should contain and when and where you see them? I only know what properties are expected by checking out the claimItem() queries from our implementation.

Going to be busy with domestic stuff this weekend so if someone wants to take on implementing any answers to these questions I won't complain. :)

alex_b’s picture

FileSize
17.04 KB

Reviewed, fixed two minor issues:

  • Expiry function in system_cron() and claimItem() were still using time() - replaced with REQUEST_TIME
  • SystemQueue instead of systemQueue

(DId not address #95's M or N)

Shouldn't variable names be lowerCamelCase? I'm looking for guidance and I only find this: #260220: OOP standards. This patch doesn't change variable_names as some of them coincide with DB column names and I'm therefore not sure what the best way to deal with them is (change db column_names to columnNames? Translate them every time when loading?). I opened an issue for making variable names consistent #451934: Naming convention for class variables not consistent.

In regards to #94, caller loop implementations: I'm wondering whether we get a lot of situations where we want to abandon a queue when it is empty. Maybe I'm missing something here, but the use cases that are coming to my mind are queues that may be empty temporally but should never be abandoned or removed when empty (e. g. drupal cron tasks, aggregation of feed items).

In these use cases a queue would be polled as long as a module is active, e. g. on hook_cron():

function mymodule_cron() {
  $queue = DrupalQueue::get('my_queue');
  while (($item = $queue->claimItem() && !mymodule_timeout())) {
    // do stuff ...
  }
}
chx’s picture

Alex is right here, you definitely not going to use the numberOfQueue because thats not accurate anyways, you simply wait until a job comes -- you might check more than queue meanwhile etc etc. Note that this is the only way to properly use the queue no matter how we create our queue API 'cos you place a call to the queue API and between the time it returns and you check whether you got something, how do you know whether something was not added? :)

IMO as long as we are aware we need to garbage clean up, we are fine. We do not clean up actions_aid either and this def belongs to the sequences API patch. We use a pattern here, if that's flawed, fix elsewhere. Let's focus on the queue here.

dww’s picture

@alex_b: I think you misunderstand my concern. I'm not talking about destroying the queue if claimItem() returns NULL. I'm just saying that while you're trying to drain the queue, do you really want a given consumer to prematurely give up, even if it still has resources, just because of a race condition on trying to mark a given item for that consumer_id? In your sample code, the addition of && !mymodule_timeout() doesn't at all change the fact that if you hit this race, claimItem() is still going to return NULL and your loop will terminate, even if you haven't timed out yet. Yes, it's probably rare, but wearing the "maximize throughput hat" (which I wore for years before I started playing with Drupal) I don't want to see a stinking race condition prevent a consumer from fully utilizing its resources, leaving jobs needlessly sitting idle in the queue until the next cron run.

@chx: Yes, obviously someone could call claimItem() on an empty queue, think there's no work to do, and a fraction of a second later, someone adds an item. There's nothing we can do about that. But, if the queue has 2000 items, and a consumer has resources to plow through all of those and drain it, why give up after 15 items if we hit the race condition and wait until the next cron run to make more progress?

Really, what do we hurt by having claimItem() iterate if we hit the race condition? That would completely satisfy my concerns. Is there any downside to doing so? If you say "bloat", I say "why bloat all the call sites to handle this edge case when we can just handle it internally in the API?".

Dries’s picture

First of all, great team work. A ton of solid progress has been made -- glad I continued the conversation despite some pressure to commit this. Some additional follow-up comments:

1. I think 'queue_name' can be 'name'. That would be more consistent.

2. Shouldn't numberOfItems() sport a WHERE name = '$this->name' clause? (Obviously this would not be safe but you get the point.) After all, we can create multiple queues sharing the same database table. Given that our tests use 2 queues, I'm surprised we didn't catch that.

3.

+    $entry = FALSE;
+    $entry = db_query_range('SELECT item, item_id, queue_name FROM {queue} q WHERE consumer_id = 0 AND queue_name = :queue_name ORDER BY created ASC', array(':queue_name' => $this->queue_name), 0, 1)->fetchObject();

$entry = FALSE can be removed.

4. I think it would be good to document in the schema definition that item is serialized. When I reviewed the schema definition, I wondered what format the item-field should be in.

dww’s picture

FileSize
17.25 KB

Re: L) This iterates if we happen to hit the race condition. Callers never have to worry about that case -- if claimItem() returns NULL, the queue is at least temporarily empty... Fixed the PHP doc in the interface to match.

Re: J) I withdraw my concern about garbage collection in here. If/when we have a better sequences API, we can convert to that. We don't need multiple schema updates, since D6 has none of this, and we don't support HEAD to HEAD schema migrations.

Looking more closely at (M) and (N), this patch is still something of a mess, although "In the return documentation of claimItem we state that addItem and deleteItem take a queue entry object it returns" is not actually true. deleteItem() really takes an "entry" not an "item", since it needs the item_id (which only lives in the "entry"). That's what it says in the docs for claimItem(), and it explains that the "entry" you get back includes the $item you pass to addItem() -- no where do the docs state that addItem() takes an "entry". The tests are all written correctly -- we pass items to addItem(), fetch entries from claimItem(), and pass entries, not items, to deleteItem(). It's just weird that claimItem() returns an entry, not an item, and that deleteItem() takes an entry, not an item. It's extra bad in deleteItem() since the argument is called $item, even though it's really an entry. However, we need the entry, not the item, so we have the item_id...

I'm not 100% positive what to do about this. At the very least, the argument to deleteItem() should be an $entry. Better yet (?), perhaps we should rename claimItem() and deleteItem() to claimEntry() and deleteEntry()? Is the cure worse than the disease in this case? ;) Should we instead define a QueueItem class that has "item_id" and "data" fields? You pass one of these to addItem() with an empty item_id. Everything in the API manipulates these QueueItem objects, instead of this weird mix of items and entries we currently have. Is that addItem() bloat for the sake of clarity? I'd like to get more opinions before I reroll for this....

O) (Related to M + N) Why does the $entry returned by claimItem() include the queue_name? Isn't that implicit from the caller when they instantiated the queue object?

dww’s picture

Status: Needs review » Needs work

Whoops, just saw Dries #99. A few answers:

99.1) Ok, sure.

99.2) Yes, it should. ;) The test doesn't catch it since it never tries to ensure that one queue is empty while the other is full. The test also needs work for that.

99.3) It does in my patch #100 because we're iterating. Probably it was left in when this code used to iterate.

99.4) Yeah, probably. However, "item" is kinda weird for the column name, anyway. I think "data" might be more appropriate. Our terminology is all inconsistent. See M/N...

dww’s picture

Status: Needs work » Needs review
FileSize
17.18 KB

Fixes 99.1 and 99.2 (and the test).

99.3 is a non-issue due to L fixed in patch #100.

99.4 -- it already says 'serialize' => TRUE, in the schema definition -- what more do you want? ;)

Back to CNR so the test bot can have its way with this, though I still think it needs work based on M/N and my concerns/suggestions in #100...

Status: Needs review » Needs work

The last submitted patch failed testing.

dww’s picture

Status: Needs work » Needs review
FileSize
17.18 KB

whoops. ;)

chx’s picture

Derek, Alex's loop is wrong, it's simply

while(time() - $start_time < $allotted_time) {
  if ($item = claimItem ) ...
}

as I said the loop might do more than just queueing for one item and you will loop until your time is up. Does this address your concerns?

Ps. yeah i made a mistake when i said he is right :) he is half-right ;)

alex_b’s picture

FileSize
17.36 KB

#105 - sorry for the confusion with !mymodule_timeout() in #96 - I only included it to indicate that the loop wouldn't break the cron timeout, it does not address the race condition of course. I did not understand dww's concern for a race condition when I posted #96

Reading the patch in #100 I understand this concern now: Considering a worker A and a worker B, worker B's call to claimItem may return NULL, when claimItem() for A and B select the same item, but A reserves it first with db_update(). B would assume the queue does not contain unreserved items at this point, but in fact this may not be true.

The loop introduced in #100 addresses this issue. I think it merits a brief comment to explain it's purpose - this patch adds such a comment and removes a stray $start variable.

dww’s picture

Glad we all understand (L) now. ;) @alex_b: Sure, that comment is fine, thanks.

P) Do we really want to use REQUEST_TIME in hook_cron()? That seems like the big exception to the point of that variable. It seems likely that hook_cron() is going to be invoked for longer than a second, and if you're at the back of the line, REQUEST_TIME != time(). I'm asking both specifically for what we're doing in system_cron(), but also more generally for functions likely to be invoked by cron like claimItem().

To review:

  • M and N (see #95 and #100)
  • O (#100)
  • P (#107)
Dries’s picture

I'm delegating this patch to chx. Given that I'm happy with the approach, I'm asking chx to champion it. I will commit this patch (after a quick review) when chx thinks this patch is ready. Thanks!

andrewlevine’s picture

@dww/@alex_b

Couldn't we avoid the (L) problem and remove the iteration if we UPDATE first then SELECT? This way we'd let the database deal with race conditions instead of us. Admittedly I don't know much about PDO or how the new UpdateQuery class works, but assuming we can retrieve the updated IDs, couldn't we do something like this:

//pseudo-php obviously
$entry = UPDATE queue WHERE consumer_id=0 AND name=$name LIMIT 1
if ($entry)
  $row = SELECT item FROM queue WHERE item_id=$entry->item_id
  return unserialize($row->item)
return NULL

It may be impossible to capture that updated item_id in a database neutral way without doing something prohibitively complicated but I thought I'd throw the idea out there...

dww’s picture

@andewlevine: Pretty sure chx and davidstruass considered possible options like that and decided what's currently implemented was the safest/fastest/most portable solution currently possible in Drupal.

chx’s picture

Status: Needs review » Needs work

UPDATE LIMIT is MySQL only. NowPublic's queue system uses that but Drupal core cant.

Re. time(), you are right -- if you operate with a short enough process time then you might get into a bad bad situation. At time 2s you claim the item for 4 seconds but the request started at time 0s actually so it expires at time 4s instead of 6s where it should. Cronjobs are not a problem, because it does not matter when you run the reset command actually -- ie if cron starts at 5s and resets only at 7s i see no probs not resettings jobs expired at 6s. You could, for consistency use time() too.

I will another later today addressing M and N and O and definitely removing that loop. I explained multiple times that the caller needs to do a time-bound loop no matter what so looping in the implementation is rather pointless.

Edit: I mean, please roll another but if noone is available I will roll one later today.

dww’s picture

chx, neclimdul and I just hashed this out in IRC. Here's what we came up with:

L) I got chx to agree that if we document that claimItem() returning FALSE is a best-effort "as far as I know, I don't have any items for you right now", we're all happy. Yes, someone else could be adding more items all the time. In many cases, the caller wants to do a time-based loop and keep consuming tasks so long as they have resources (and either try to claim from another queue or sleep or something if claimItem() returns FALSE). But, that's not always the case, it's a pain if claimItem() can return FALSE just because of the race condition, and it's easier for call sites to make sense what FALSE really means...

M) Hashing out the alternatives, here's our unified proposal for the API:

/**
 * Create an item and add it to the queue.
 *
 * @param $data
 * @return
 *   TRUE if the item was successfully created and was (best effort) added to the queue, otherwise FALSE.  We don't guarantee the item was committed to disk, that your disk wasn't hit by a meteor, etc, but as far as we know, we've got that item in the queue now.
 */
createItem($data) { ... }

/**
 * Claim an item from the queue for processing.
 *
 * @return
 *   An $item object where $item->data is the $data given to createItem(), plus additional metadata fields.  FALSE if the queue is (best effort) currently empty.
 */
claimItem() { ... }

/**
 * Remove an item from the queue after processing.
 *
 * @param $item
 *   An $item object returned by claimItem().
 */
deleteItem($item) { ... }

So, "item" is always an object with the data the caller gave us and other metadata. The API operates on items (what we used to call "entries"). createItem() creates the item based on the caller's $data and adds it to the queue in one step. Everything else returns or takes the $item, not the $data.

I think this is a pretty non-WTF API, it's similar to the cache API in a few ways, and it forces callers to do a reasonable thing with their queue workflow -- we don't return the $item in createItem() since you shouldn't do anything once it's in the queue unless you claim it first. We should probably mention this part in the docs for createItem().

N) $item->data is all the caller needs to know. Everything else is queue-specific metadata, which is implementation dependent, and is irrelevant for the caller.

O) Yes, it's silly for SystemQueue::claimItem() to include the queue name in the $item -- we don't need that for SystemQueue:deleteItem(), and the caller already knows the queue name.

P) Yup -- REQUEST_TIME is evil and wrong in this case. we really need time() in most places in this patch.

Q) [new] claimItem() should return FALSE if there are no items, not NULL. (I don't really care, but chx asked and neclimdul slightly preferred FALSE -- wfm).

neclimdul’s picture

Status: Needs work » Needs review
FileSize
17.94 KB

Addressed the points raised in #112 and update a lot of the docs to reflect the removal of entries and clearly describe the $item object and its roll in interacting with the queue.

Found a bug where we weren't actually setting the created field in the SystemQueue implementation. Fixed that and documented the field better.

Moving up to needs review as that should address all listed outstanding concerns.

dww’s picture

FileSize
19.4 KB

That's almost perfect. Rerolled for a few things:

M) createItem() was still documented as taking $item not $data in the interface definition. Fixed.

P) Added comments to the places we use time() instead of REQUEST_TIME so no one tries to "fix" those later.

R) [new] Avoided another possible DrupalWTF moment: we have create(Queue|Item)() but had deleteItem() and removeQueue(). :( s/removeQueue/deleteQueue/ for consistency.

- Fixed the schema description for {queue}.expire field: "Timestamp when the claim lease expires on the item."

- Some other minor PHPDoc updates.

Note, from my perspective, there are only two things "broken" about this patch, both of which belong to another issue and shouldn't hold this up from going in now:

J) Garbage collection -- belongs to #356074: Provide a sequences API

S) (From Crell in #80.5): This line in the factory method is still wonky: $class = variable_get('queue_module_'. $name, 'System') . 'Queue';. But, that can/should be fixed in a followup issue, probably as a conversion to handlers.

neclimdul’s picture

Changes look good to me. With chx and the bots nod I think this is ready with the the caveats for J) and S) listed by dww in #114

dww’s picture

FileSize
19.41 KB

One more tiny thing. SystemQueue::createItem() wasn't properly implementing the interface. It would return SAVED_NEW (and, in theory, SAVED_UPDATED) on success, not TRUE.

chx’s picture

Status: Needs review » Needs work

My only qualms, at last are minor, and in the test: DrupalQueue::get($queue1 = $this->randomName())->createQueue(); ouch. You need to listen to "Write in C" less :p also , all the ::get calls, cant we simple store the queue objects in a variable?

chx’s picture

While there I realized that CQ really should be DrupalQueue:createQueue and call a static method (call_user_func($classname, createQueue) because otherwise the constructor always needs to wonder "am I created queue?". Same for deleteQueue.

Crell’s picture

The interface should declare its functions public (assuming they are) for explicitness and consistency.

claimItem() claims the NEXT item in the queue, doesn't it? It doesn't say so explicitly in the docblock, but should.

The docblocks talk a lot about edge cases where things may be wrong or not consistent or the system will make a guess etc., but doesn't really give a good explanation of WHY it may be necessary to do so. A FIFO queue should aways have the same in/out order, shouldn't it? That's what I and I suspect most people would expect, so if that's not the case it should be better explained why that's not the case.

protected $consumer_id should be $consumerId, since it's an object property.

$consumerId and $name need docblocks as well, if we're going to be really complete about it (which we should be).

I'm not sure off hand if changing the default value for $lease_time from the interface to the implementation breaks E_STRICT. We should try to be E_STRICT if possible.

My only major complaint (all of the above is probably 30 min of work, mostly documentation) is that DrupalQueue has no business being a class. It's a single factory function that's tiny. It's 5 lines of actual code. That can very easily be just a factory function. The code weight impact of that should be miniscule.

chx’s picture

All presumptions about a queue is false. There is a queue which have items you can claim. There is no strict ordering for example so saying 'next' is meaningless. Your problem here is that you took C (consistency) and A (availability) from the CAP theorems while this patch definitely prepares for A and P (being partition tolerant). For example.... if you have servers in several datacentres then you would need to issue a worldwide lock before inserting an item into a queue which might be quite unpleasant.

About the factory function I wont relent because its a useful pattern we can establish here. This might be just 7 lines but then 7 lines for everything pluggable or just load-on-demand? We can change it if handlers get in.

chx’s picture

Let's add a few more words to "due to the pluggable nature of the queue, there is no guarantee" -- that all parties (adders, customers, queue backend) can be distributed and its undesirable to keep the whole system consistent all the time for performance reasons and this is the chief reason why there is no order guarantee and there is no precise count of items.

neclimdul’s picture

FileSize
19.03 KB

Here's what I've changed the comment to regarding #119-121

 * While the queue system makes a best effort to preserve order in messages,
 * due to the pluggable nature of the queue, there is no guarantee that items
 * will be delivered on claim in the order they were sent. For example, some
 * implementations may have distributed back-ends like AmazonSQS or beanstalkd
 * will be managing jobs for a large set of producers and consumers where a
 * strict FIFO ordering will likely not be preserved. 

I've gone ahead and explicitly set functions to public, changed the object properties to fix our object coding standards and taken care of all the trivial E_STRICT items I could find(the lease_time was not one of them). That should address most of Crell's comments.

I talked to chx about making createQueue and deleteQueue static methods on DrupalQueue but I think I've changed my mind on that. Going through our factory and having the constructor run provides for any connection information to be setup etc. I think this will actually end up being less of a WTF then the other way.

chx’s picture

Deciding between static and normal method is not easy. However the constructor is unlikely to need the queue itself as if there is a need for any sort of connection then that very likely wont include the queuename. Beanstalkd, which is NOT distributed , for example works like that.

Clarification: normal method is fine, remove beanstalkd from the distributed example as its not.

chx’s picture

If someone wants to write an Amazon SQS backend -- please do. http://debuggable.com/posts/queues-in-the-cloud-debuggable-php-sqs-libra... is a good place to start.

neclimdul’s picture

FileSize
17.75 KB

Fix Amazon SQS name in doc and don't refer to beanstalkd as distributed. Yay late night patch-a-thon!

neclimdul’s picture

Status: Needs work » Needs review

Review!

dww’s picture

FileSize
18.61 KB

I'm glad Crell weighed in again, and I think the improvements since #119 are good ones. I just fixed a typo/awkward language in the new doc block for $consumerId, and added ()'s after claimItem(). It now reads:

  /**
   * Our internal consumer ID for this queue instance.
   *
   * This is created lazily when we start consuming items with claimItem().
   *
   * @var integer
   */

After pouring many hours into this issue and these patches, the last thing I want to do is derail this. However, I have some concerns about the usefulness of this API for certain things, chx pointed me to the "Job scheduler" issue, but that just runs right back into the same limitations and concerns I've had in here... So, interested parties should read #410656-13: Job scheduler. I don't think that's a reason to hold this up -- what we have here is great, and will definitely be useful for some things. I just don't think it'll be useful for everything in this space. But that's ok, one API can't be everything to everyone... better that it does one fairly simple thing really well than trying to do too many things poorly.

Meta: "Queue" sort of implies FIFO. :( What we have here is more like a "Set". Should we rename? ;) /me ducks.

chx’s picture

Status: Needs review » Reviewed & tested by the community

Derek, this is a "best effort FIFO". I ran out of things to pick on. Thanks gentlemen for the work.

neclimdul’s picture

Yay for 1am documentation... Thanks for the fix dww.

Crell’s picture

So should we refer to it as a FIPFO queue? (First In Probably First Out) In English you'd still pronounce it the same way.

*runs so the patch can be committed*

Dries’s picture

Status: Reviewed & tested by the community » Fixed

Committed to CVS HEAD. Awesome job all! :-)

yched’s picture

Looks awesome :-)

Minor point :
"The full queue item returned by DrupalQueueInterface::createItem() needs to be passed to DrupalQueueInterface::deleteItem() once processing is completed."
Shouldn't that be :
"The full queue item returned by DrupalQueueInterface::claimItem() needs to be passed ..."
?

That's what the test does, and it seems to make more sense in terms of who does what...

dww’s picture

Status: Fixed » Needs work

yched. Yes, good catch. Please feel free to provide a patch for that. ;)

dww’s picture

Status: Needs work » Reviewed & tested by the community
FileSize
1.17 KB

Bah, here you go. ;) Trivial doc-only change. RTBC'ing myself.

Pasqualle’s picture

RE #65 C)

is it really worth duplicating the 'description' fields in that table definition array we pass to db_create_table()? Those are totally ignored by db_create_table(), and anything looking up the descriptions of the schema are going to look in the hook_schema() implementation, not the DB udpates, right? I guess it doesn't hurt, but in my contribs, I never document the schema in the update functions, only the "real" schema in the hook_schema definition itself.

The schema description fields are ignored by db_create_table() in D6, but they are used in D7.

mysql schema.inc
line 87:

    // Add table comment.
    if (!empty($table['description'])) {
      $sql .= ' COMMENT ' . $this->prepareComment($table['description'], self::COMMENT_MAX_TABLE);
    }

line 142:

    // Add column comment.
    if (!empty($spec['description'])) {
      $sql .= ' COMMENT ' . $this->prepareComment($spec['description'], self::COMMENT_MAX_COLUMN);
    }
xmacinfo’s picture

Issue tags: +Needs documentation

Looks good!

Please had an entry in the CHANGELOG.txt. This is an important new feature. :-)

dww’s picture

Status: Reviewed & tested by the community » Needs work
Issue tags: -Needs documentation

@Pasqualle: Thanks, duly noted. ;)

@xmacinfo: Ok, then this documentation cleanup patch still needs work, I guess. Nothing stops you from writing said CHANGELOG entry, does it? ;)

MisterSpeed’s picture

This looks like a fantastic module. One thing that we discovered about scaling an integrated PHP system is that it works well in most cases, but when it starts to fail it becomes a heavy burden and a tangle to undo. On slight change that would make this incredibly powerful would be to let the interface be upgradeable (or would support delegation).

Take a site that hosts pictures and the content that surround it. One of the major tasks a site like this has to handle is to rescale the pictures and store them in the cloud. That takes a while. If you only have a few hundred users, the current solution is fine and dandy.

Now consider this site growing to a million users. The current queue will suddenly burst at the seams. The developer has two choices: go back in code to job inception and redesign the whole queue processing from there from scratch, or, preferably, simply overhaul the bit that is causing the bottleneck. He could just plug in a delegate that would look at certain tasks, serialize those that he knows would be too heavy for the queue 9perhaps through good design, perhaps by looking for the signature of tasks that are known to block the queue and adapt to changing circumstances on the fly), ship them to a full-time PHP daemon outside of the constraints of the web server (or on a dedicated core, or even outside of this machine), and offload the task this way in an elegant manner. All that we then need is a conditional delegation hook, or a user-definable function that supersedes these methods.

neclimdul’s picture

@63reasons That is the entire point of the object/interface design of this patch. Please notice how we conditionally load a class based on a variable get. Also, read #114 S) and #80.5 and the surrounding discussions for for additional notes.

MisterSpeed’s picture

Ok, got sort of foggy-brained after reading the first 50 posts or so. Glad it works that way !

dww’s picture

Status: Needs work » Reviewed & tested by the community

My trivial doc cleanup patch in #134 is RTBC again, since a CHANGELOG entry for this was added over at #578676: Use queue for cron.

Dries’s picture

Status: Reviewed & tested by the community » Fixed

Committed the documentation clean-up. Thanks dww.

Status: Fixed » Closed (fixed)

Automatically closed -- issue fixed for 2 weeks with no activity.

alex_b’s picture

FYI:

I put up a D6 backport of DrupalQueue: http://drupal.org/project/drupal_queue
Using it experimentally right now in Feeds: http://drupal.org/project/feeds

Ken Wolf’s picture

quazardous’s picture

hi,

I ve just discovered this post ...

maybe I ve done some useless work :
http://drupal.org/project/process_api

xmacinfo’s picture

Your work should not be useless.

In Drupal 7 new features are to be made in contrib modules. For Drupal 8, you need to open a new issue to add new features to the core.

dww’s picture

@xmacinfo: I think quazardous is making the point that D7 core already supports the functionality that he wrote this contrib module to provide. That's probably true. Which isn't to say the core JobQueue API (which has been back-ported to D6 as a contrib) is perfect. But yeah, you should definitely check it out and see if it can handle your needs.

WorldFallz’s picture

Issue summary: View changes

Copy of the revision from March 25, 2012 - 08:40.