http://redis.io/commands/rpoplpush
http://redis.io/commands/brpoplpush
It looks like the current implementation is vulnerable to having a worker crash and/or you are implementing almost same thing via separate rPop and zAdd commands?
However, by using zAdd seems liek you could have a logic error if the same job is in the queue 2x? ZAdd will update its score if it already exists.
| Comment | File | Size | Author |
|---|---|---|---|
| #15 | 1374000-15.patch | 9.52 KB | pwolanin |
| #14 | 1374000-14.patch | 6.48 KB | pwolanin |
| #13 | 1374000-13.patch | 5.94 KB | pwolanin |
| #9 | 1374000-9.patch | 2.8 KB | msonnabaum |
| #6 | 1374000-6.patch | 6.9 KB | pwolanin |
Comments
Comment #1
msonnabaum commentedI was using rpoplpush originally, but switched to the separate zadd so I could support claimed item expiration using scores.
I'm very much open to a different technique here that'll move us back to rpoplpush but still support expiration. I'm also open to the argument that expiration is not worth implementing here, but it is part of the interface.
Comment #2
pwolanin commentedYeah, looking at the Redis API, it's hard to see how to really do this right. Basically you want some supervisor process to be able to look at the backup queue and see what's "timed out".
The redis docs say: "Another process (that we call Helper), can monitor the backup list to check for timed out entries to re-push against the main queue."
But I don't see how you can easily determine that something timed out. If you added a timestamp to the object when you added it to the queue you don't know how long it took to get claimed.
A scheme that could work is this:
create a new backup queue every second (or 10 seconds, or something easy) and name with the timestamp, e.g.
backup:1324237030Assuming a new claimed queue per 10 seconds, we just make some minimal changes to the code like:
you can then use the command: http://redis.io/commands/keys to find all Lists matching the claimed pattern (as you already have in public function expire()), and split out the timestamp to find any that are expired.
A quick test with redis locally shows that the KEYS command skips empty lists (or maybe they are effectively deleted by being empty), so you do not even have to delete them if all workers completed.
Comment #3
pwolanin commentedHere's an even better idea.
Why not just store a simple string ID for each item, and store the serialized value in a Redis Hash?
Comment #4
pwolanin commentedHere's a totally untested rewrite using a hash to store the actual job data, and using lists as queues just of job Ids.
Comment #5
pwolanin commentedoops - that last one had a syntax error.
Comment #6
pwolanin commentedFix expiration logic (maybe).
There is still highly duplicated code between claimItem() and claimItemBlocking() that could be eliminated.
It would be interesting to see how much (if any) faster this is for jobs with large amounts of data, since you avoid moving the data between the avail and claimed lists.
As written, a lease time of < 10 seconds rounds up to 10 seconds, so maybe that should be configurable.
The core implementation has a default lease time of 30 sec, so not sure why it's 3600 sec here
http://api.drupal.org/api/drupal/modules--system--system.queue.inc/funct...
Comment #7
pwolanin commentedMarc points me to this discussion initiated by chx:
http://stackoverflow.com/questions/7625101/redis-queue-with-claim-expire
The basic algorithm one might take way from that is that after using rpoplpush to move a queued item from "available" to "claimed" you set with an expiration value a key derived from the ID of the queued item, e.g.
And then to expire and recycle the items in the claimed list you basically iterate over the claimed list and test whether the lock key still exists. If not, add it back to available.
A downside here is that this algorithm does not use the atomic rpoplpush() the way the patch above does for moving items back to available.
Comment #8
rb2k commentedI think the initial idea with rpoplpush was a good approach.
To realize a simple queue in redis that can be used to resubmit crashed jobs I'd try something like this:
- 1 list "up_for_grabs"
- 1 list "being_worked_on"
- auto expiring locks
Note that this assumes we don't need fancy features like filtering out duplicate jobs.
I'll try to put this down as ruby-like pseudo code.
A worker trying to grab a job would do something like this:
And every now and then, we could just grab our list and check that all jobs that are in there actually have a lock.
If we find any jobs that DON'T have a lock, this means it expired and our worker probably crashed.
In this case we would resubmit.
This would be the pseudo code for that:
Comment #9
msonnabaum commentedHere's an implementation of more or less what Marc described.
Seems pretty simple and tests are passing with it.
Comment #10
pwolanin commentedPer my above patch I think you should also avoid moving the actual job data between queues, but rather just move the IDs.
Currently you rely on the serialized item being the same as the original item, which I would consider very fragile, and which would remove any additional jobs that serializes to the same value.
lrem($this->claimed, $item, -1)Comment #11
msonnabaum commentedI include the qid in the serialized item, so there shouldn't ever be duplicates. Also, -1 should only remove one item, so worst case it removes one of the duplicates, not all.
And now that the data moving is happening in a single operation within redis (rpoplpush), I'm not that concerned with moving the data along with the id. I'm assuming your concern was performance, but let me know if there's something else I'm missing there.
Comment #12
pwolanin commented2 concerns:
first is the performance of potentially moving large serialized objects around.
second is (as before) that the client may e.g. alter the $item in some way so that serialized value doesn't match the thing in the queue. In this case, the same job might get re-run in an infinite loop.
Comment #13
pwolanin commentedHere's another patch - haven't actually gotten the driver set up, so not tested, but uses the expiring lease algorithm plus keeps the actual jobs data in a hash so only the job IDs are moving around.
Comment #14
pwolanin commentedRevised patch which passes the unit tests.
Comment #15
pwolanin commentedAdded an expireAll() method to be called from hook_cron() and corresponding test assertions.
Also, renames $this->queue to $this->redis, for better code clarity.
Comment #16
msonnabaum commented#15 Looks good to me. Committed.