QueueInterface::claimItem returns useful data about the queued item in addition to the actual data that was queued:

   *   If returned, the object will have at least the following properties:
   *   - data: the same as what what passed into createItem().
   *   - item_id: the unique ID returned from createItem().
   *   - created: timestamp when the item was put into the queue.

In particular, the 'created' timestamp is potentially useful.

However, the cron queue processor throws this away and gives us just the data:

        while (time() < $end && ($item = $queue->claimItem($lease_time))) {
          try {
            $queue_worker->processItem($item->data);

This is nice DX, as queue worker plugin don't need to unserialize it and you get the same thing in your queue worker's processItem() as you put into createItem(), but information is lost.

One way to fix this would be for processItem() to receive the complete item as as second parameter.

Comments

joachim created an issue. See original summary.

Version: 8.5.x-dev » 8.6.x-dev

Drupal 8.5.0-alpha1 will be released the week of January 17, 2018, which means new developments and disruptive changes should now be targeted against the 8.6.x-dev branch. For more information see the Drupal 8 minor version schedule and the Allowed changes during the Drupal 8 release cycle.

Version: 8.6.x-dev » 8.7.x-dev

Drupal 8.6.0-alpha1 will be released the week of July 16, 2018, which means new developments and disruptive changes should now be targeted against the 8.7.x-dev branch. For more information see the Drupal 8 minor version schedule and the Allowed changes during the Drupal 8 release cycle.

Version: 8.7.x-dev » 8.8.x-dev

Drupal 8.7.0-alpha1 will be released the week of March 11, 2019, which means new developments and disruptive changes should now be targeted against the 8.8.x-dev branch. For more information see the Drupal 8 minor version schedule and the Allowed changes during the Drupal 8 release cycle.

Version: 8.8.x-dev » 8.9.x-dev

Drupal 8.8.0-alpha1 will be released the week of October 14th, 2019, which means new developments and disruptive changes should now be targeted against the 8.9.x-dev branch. (Any changes to 8.9.x will also be committed to 9.0.x in preparation for Drupal 9’s release, but some changes like significant feature additions will be deferred to 9.1.x.). For more information see the Drupal 8 and 9 minor version schedule and the Allowed changes during the Drupal 8 and 9 release cycles.

Version: 8.9.x-dev » 9.1.x-dev

Drupal 8.9.0-beta1 was released on March 20, 2020. 8.9.x is the final, long-term support (LTS) minor release of Drupal 8, which means new developments and disruptive changes should now be targeted against the 9.1.x-dev branch. For more information see the Drupal 8 and 9 minor version schedule and the Allowed changes during the Drupal 8 and 9 release cycles.

Version: 9.1.x-dev » 9.2.x-dev

Drupal 9.1.0-alpha1 will be released the week of October 19, 2020, which means new developments and disruptive changes should now be targeted for the 9.2.x-dev branch. For more information see the Drupal 9 minor version schedule and the Allowed changes during the Drupal 9 release cycle.

Version: 9.2.x-dev » 9.3.x-dev

Drupal 9.2.0-alpha1 will be released the week of May 3, 2021, which means new developments and disruptive changes should now be targeted for the 9.3.x-dev branch. For more information see the Drupal core minor version schedule and the Allowed changes during the Drupal core release cycle.

Version: 9.3.x-dev » 9.4.x-dev

Drupal 9.3.0-rc1 was released on November 26, 2021, which means new developments and disruptive changes should now be targeted for the 9.4.x-dev branch. For more information see the Drupal core minor version schedule and the Allowed changes during the Drupal core release cycle.

Version: 9.4.x-dev » 9.5.x-dev

Drupal 9.4.0-alpha1 was released on May 6, 2022, which means new developments and disruptive changes should now be targeted for the 9.5.x-dev branch. For more information see the Drupal core minor version schedule and the Allowed changes during the Drupal core release cycle.

Version: 9.5.x-dev » 10.1.x-dev

Drupal 9.5.0-beta2 and Drupal 10.0.0-beta2 were released on September 29, 2022, which means new developments and disruptive changes should now be targeted for the 10.1.x-dev branch. For more information see the Drupal core minor version schedule and the Allowed changes during the Drupal core release cycle.

Version: 10.1.x-dev » 11.x-dev

Drupal core is moving towards using a “main” branch. As an interim step, a new 11.x branch has been opened, as Drupal.org infrastructure cannot currently fully support a branch named main. New developments and disruptive changes should now be targeted for the 11.x branch, which currently accepts only minor-version allowed changes. For more information, see the Drupal core minor version schedule and the Allowed changes during the Drupal core release cycle.

solideogloria’s picture

I agree. I have a use-case where it could be solved if I had the entire $item as a second parameter.

I have implemented a MergeDatabaseQueue to avoid duplicate queue items:

class MergeDatabaseQueue extends DatabaseQueue {

  /**
   * {@inheritdoc}
   */
  protected function doCreateItem($data): ?int {
    // This is copied and modified from DatabaseQueue::doCreateItem().
    // Merge instead of insert to prevent duplicates. An item is updated if it
    // already exists.
    $serialized = serialize($data);
    $query = $this->connection->merge(static::TABLE_NAME)
      ->keys([
        'name' => $this->name,
        'data' => $serialized,
      ])
      ->fields([
        'name' => $this->name,
        'data' => $serialized,
        // Note that this will update the 'created' field for an existing item.
        // We cannot rely on REQUEST_TIME because many items might be created
        // by a single request which takes longer than 1 second.
        'created' => \Drupal::time()->getCurrentTime(),
      ]);
    return $query->execute();
  }

}

However, in one case, I also need to track the time the item was queued, so that the item is only processed if it is older than a week. If I track that as part of the data that is serialized, the item will no longer be mergeable with any other item having the rest of the data the same. What I really need is to be able to view the created field inside the worker's processItem function, rather than having to include it in the serialized data.

solideogloria’s picture

If you use ultimate_cron, this is a hacky workaround:

namespace Drupal\my_module\Queue;

use Drupal\Core\Config\ConfigFactory;
use Drupal\Core\Queue\DelayableQueueInterface;
use Drupal\Core\Queue\DelayedRequeueException;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\RequeueException;
use Drupal\Core\Queue\SuspendQueueException;
use Drupal\Core\Utility\Error;
use Drupal\ultimate_cron\CronJobInterface;
use Drupal\ultimate_cron\QueueWorker;
use Psr\Log\LoggerInterface;

/**
 * A modified copy of \Drupal\ultimate_cron\QueueWorker.
 *
 * @hack This class allows workers to access $item by calling processItem() with
 * $item as the second parameter.
 */
class ItemAwareQueueWorker extends QueueWorker {

  /**
   * The logger.
   */
  protected LoggerInterface $logger;

  /**
   * The constructor.
   *
   * @param \Drupal\Core\Queue\QueueWorkerManager $plugin_manager_queue_worker
   *   The queue worker manager.
   * @param \Drupal\Core\Queue\QueueFactory $queue
   *   The queue factory.
   * @param \Drupal\Core\Config\ConfigFactory $config_factory
   *   The config factory.
   * @param \Psr\Log\LoggerInterface $logger
   *   The logger.
   */
  public function __construct(QueueWorkerManager $plugin_manager_queue_worker, QueueFactory $queue, ConfigFactory $config_factory, LoggerInterface $logger) {
    parent::__construct($plugin_manager_queue_worker, $queue, $config_factory);
    // @hack The logger was added for using calls to Error::logException.
    $this->logger = $logger;
  }

  /**
   * {@inheritdoc}
   */
  public function queueCallback(CronJobInterface $job): void {
    $queue_name = str_replace(CronJobInterface::QUEUE_ID_PREFIX, '', $job->id());

    $queue_manager = $this->pluginManagerQueueWorker;
    $queue_factory = $this->queue;

    $config = $this->configFactory->get('ultimate_cron.settings');

    $info = $queue_manager->getDefinition($queue_name);

    // Make sure every queue exists. There is no harm in trying to recreate
    // an existing queue.
    $queue_factory->get($queue_name)->createQueue();

    /** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
    $queue_worker = $queue_manager->createInstance($queue_name);
    // @hack This line was modified for coding standards.
    $end = microtime(TRUE) + $info['cron']['time'] ?? $config->get('queue.timeouts.time');

    /** @var \Drupal\Core\Queue\QueueInterface $queue */
    $queue = $queue_factory->get($queue_name);
    $items = 0;
    while (microtime(TRUE) < $end) {
      // Check kill signal.
      if ($job->getSignal('kill')) {
        // @hack This line was modified for coding standards.
        $this->logger->warning('Kill signal received for job @job_id', ['@job_id' => $job->id()]);
        break;
      }

      $item = $queue->claimItem($config->get('queue.timeouts.lease_time'));

      // If there is no item, check the empty delay setting and wait if
      // configured.
      if (!$item) {
        if ($config->get('queue.delays.empty_delay')) {
          usleep($config->get('queue.delays.empty_delay') * 1000000);
          continue;
        }
        else {
          break;
        }
      }

      try {
        // We have an item, check if we need to wait.
        if ($config->get('queue.delays.item_delay')) {
          if ($items == 0) {
            // Move the boundary if using a throttle,
            // to avoid waiting for nothing.
            $end -= $config->get('queue.delays.item_delay');
          }
          else {
            // Sleep before retrieving.
            usleep($config->get('queue.delays.item_delay') * 1000000);
          }
        }

        // @hack This line was modified.
        $queue_worker->processItem($item->data, $item);
        $queue->deleteItem($item);
        $items++;
      }
      catch (RequeueException $e) {
        // The worker requested the task be immediately requeued.
        $queue->releaseItem($item);
      }
      catch (DelayedRequeueException $e) {
        if ($queue instanceof DelayableQueueInterface) {
          // This queue can handle a custom delay; use the duration provided
          // by the exception.
          $queue->delayItem($item, $e->getDelay());
        }
      }
      catch (SuspendQueueException $e) {
        // If the worker indicates there is a problem with the whole queue,
        // release the item and skip to the next queue.
        $queue->releaseItem($item);

        // @hack This line was modified for coding standards.
        Error::logException($this->logger, $e);

        // Rethrow the SuspendQueueException, so that the queue is correctly
        // suspended for the current cron run to avoid infinite loops.
        throw $e;

      }
      catch (\Exception $e) {
        // In case of any other kind of exception, log it and leave the item
        // in the queue to be processed again later.
        // @hack This line was modified for coding standards.
        Error::logException($this->logger, $e);
      }
    }
  }

}

my_module.services.yml

  my_module.item_aware_queue_worker:
    class: Drupal\my_module\Queue\ItemAwareQueueWorker
    arguments: ["@plugin.manager.queue_worker", "@queue", "@config.factory", '@logger.channel.my_module']

Queue config, ultimate_cron.job.ultimate_cron_queue_my_module_some_queue.yml

langcode: en
status: true
dependencies:
  module:
    - my_module
title: 'Queue: My Module Some Queue'
id: ultimate_cron_queue_my_module_some_queue
weight: 10
module: my_module
# @hack Using custom queue callback.
callback: 'my_module.item_aware_queue_worker:queueCallback'
scheduler:
  id: crontab
  configuration:
    rules:
      # Daily from 9-10 AM.
      - '* 9 * * *'
launcher:
  id: serial
logger:
  id: database