http://redis.io/commands/rpoplpush
http://redis.io/commands/brpoplpush

It looks like the current implementation is vulnerable to having a worker crash and/or you are implementing almost same thing via separate rPop and zAdd commands?

However, by using zAdd seems liek you could have a logic error if the same job is in the queue 2x? ZAdd will update its score if it already exists.

Comments

msonnabaum’s picture

I was using rpoplpush originally, but switched to the separate zadd so I could support claimed item expiration using scores.

I'm very much open to a different technique here that'll move us back to rpoplpush but still support expiration. I'm also open to the argument that expiration is not worth implementing here, but it is part of the interface.

pwolanin’s picture

Yeah, looking at the Redis API, it's hard to see how to really do this right. Basically you want some supervisor process to be able to look at the backup queue and see what's "timed out".

The redis docs say: "Another process (that we call Helper), can monitor the backup list to check for timed out entries to re-push against the main queue."

But I don't see how you can easily determine that something timed out. If you added a timestamp to the object when you added it to the queue you don't know how long it took to get claimed.

A scheme that could work is this:

create a new backup queue every second (or 10 seconds, or something easy) and name with the timestamp, e.g. backup:1324237030

Assuming a new claimed queue per 10 seconds, we just make some minimal changes to the code like:


function __construct($name) {
  $this->claimed = 'drupal:queue:' . $name . ':claimed:' . round(REQUEST_TIME, -1);

  ...
}

you can then use the command: http://redis.io/commands/keys to find all Lists matching the claimed pattern (as you already have in public function expire()), and split out the timestamp to find any that are expired.

A quick test with redis locally shows that the KEYS command skips empty lists (or maybe they are effectively deleted by being empty), so you do not even have to delete them if all workers completed.

pwolanin’s picture

Here's an even better idea.

Why not just store a simple string ID for each item, and store the serialized value in a Redis Hash?

pwolanin’s picture

Status: Active » Needs review
StatusFileSize
new6.56 KB

Here's a totally untested rewrite using a hash to store the actual job data, and using lists as queues just of job Ids.

pwolanin’s picture

StatusFileSize
new6.56 KB

oops - that last one had a syntax error.

pwolanin’s picture

StatusFileSize
new6.9 KB

Fix expiration logic (maybe).

There is still highly duplicated code between claimItem() and claimItemBlocking() that could be eliminated.

It would be interesting to see how much (if any) faster this is for jobs with large amounts of data, since you avoid moving the data between the avail and claimed lists.

As written, a lease time of < 10 seconds rounds up to 10 seconds, so maybe that should be configurable.

The core implementation has a default lease time of 30 sec, so not sure why it's 3600 sec here

http://api.drupal.org/api/drupal/modules--system--system.queue.inc/funct...

pwolanin’s picture

Marc points me to this discussion initiated by chx:

http://stackoverflow.com/questions/7625101/redis-queue-with-claim-expire

The basic algorithm one might take way from that is that after using rpoplpush to move a queued item from "available" to "claimed" you set with an expiration value a key derived from the ID of the queued item, e.g.

$qid = $this->queue->rpoplpush($this->avail, $this->claimed);
$this->queue->setex($this->avail . '_lock:' . $qid, $lease_time, time());

And then to expire and recycle the items in the claimed list you basically iterate over the claimed list and test whether the lock key still exists. If not, add it back to available.

A downside here is that this algorithm does not use the atomic rpoplpush() the way the patch above does for moving items back to available.

rb2k’s picture

I think the initial idea with rpoplpush was a good approach.
To realize a simple queue in redis that can be used to resubmit crashed jobs I'd try something like this:

- 1 list "up_for_grabs"
- 1 list "being_worked_on"
- auto expiring locks

Note that this assumes we don't need fancy features like filtering out duplicate jobs.
I'll try to put this down as ruby-like pseudo code.

A worker trying to grab a job would do something like this:

timeout = 3600
#Move the job away from the queue so nobody else tries to claim it
job = RPOPLPUSH(up_for_grabs, being_worked_on)
#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary though
SETEX('lock:' + job, Time.now + timeout, timeout)
#our application logic
do_work(job)
#Remove the finished item from the queue.
LREM being_worked_on -1 job
#Delete the item's lock. If it crashes here, the expire will take care of it
DEL('lock:' + job)

And every now and then, we could just grab our list and check that all jobs that are in there actually have a lock.
If we find any jobs that DON'T have a lock, this means it expired and our worker probably crashed.
In this case we would resubmit.

This would be the pseudo code for that:

loop do
	items = LRANGE(being_worked_on, 0, -1)
	items.each do |job| 
		if !(EXISTS("lock:" + job))
			puts "We found a job that didn't have a lock, resubmitting: #{job}"
                        LREM being_worked_on -1 job
                        LPUSH(up_for_grabs, job)
		end
	end
	sleep 60
end
msonnabaum’s picture

StatusFileSize
new2.8 KB

Here's an implementation of more or less what Marc described.

Seems pretty simple and tests are passing with it.

pwolanin’s picture

Per my above patch I think you should also avoid moving the actual job data between queues, but rather just move the IDs.

Currently you rely on the serialized item being the same as the original item, which I would consider very fragile, and which would remove any additional jobs that serializes to the same value. lrem($this->claimed, $item, -1)

msonnabaum’s picture

I include the qid in the serialized item, so there shouldn't ever be duplicates. Also, -1 should only remove one item, so worst case it removes one of the duplicates, not all.

And now that the data moving is happening in a single operation within redis (rpoplpush), I'm not that concerned with moving the data along with the id. I'm assuming your concern was performance, but let me know if there's something else I'm missing there.

pwolanin’s picture

2 concerns:

first is the performance of potentially moving large serialized objects around.

second is (as before) that the client may e.g. alter the $item in some way so that serialized value doesn't match the thing in the queue. In this case, the same job might get re-run in an infinite loop.

pwolanin’s picture

StatusFileSize
new5.94 KB

Here's another patch - haven't actually gotten the driver set up, so not tested, but uses the expiring lease algorithm plus keeps the actual jobs data in a hash so only the job IDs are moving around.

pwolanin’s picture

StatusFileSize
new6.48 KB

Revised patch which passes the unit tests.

pwolanin’s picture

StatusFileSize
new9.52 KB

Added an expireAll() method to be called from hook_cron() and corresponding test assertions.

Also, renames $this->queue to $this->redis, for better code clarity.

msonnabaum’s picture

Status: Needs review » Fixed

#15 Looks good to me. Committed.

Status: Fixed » Closed (fixed)

Automatically closed -- issue fixed for 2 weeks with no activity.