diff --git a/advancedqueue.install b/advancedqueue.install index 38c37e7..ea2245f 100644 --- a/advancedqueue.install +++ b/advancedqueue.install @@ -5,8 +5,6 @@ * Contains install and update functions for Advanced queue. */ -use Drupal\Core\Database\Database; - /** * Implements hook_schema(). */ @@ -77,12 +75,19 @@ function advancedqueue_schema() { 'default' => 0, 'description' => 'The lease expiration timestamp.', ], + 'unique_id' => [ + 'type' => 'varchar_ascii', + 'length' => 255, + 'not null' => FALSE, + 'description' => 'The unique hash of the job item.', + ], ], 'primary key' => ['job_id'], 'indexes' => [ 'queue' => ['queue_id', 'state', 'available', 'expires'], 'queue_state' => ['state'], 'queue_expires' => ['expires'], + 'unique_id' => ['unique_id'], ], ]; @@ -94,8 +99,8 @@ function advancedqueue_schema() { */ function advancedqueue_update_8101() { $spec = advancedqueue_schema(); - $schema = Database::getConnection()->schema(); - $schema->addIndex('advancedqueue', 'queue_state', ['state'], $spec['advancedqueue']); + $database_schema = \Drupal::database()->schema(); + $database_schema->addIndex('advancedqueue', 'queue_state', ['state'], $spec['advancedqueue']); } /** @@ -103,6 +108,18 @@ function advancedqueue_update_8101() { */ function advancedqueue_update_8102() { $spec = advancedqueue_schema(); - $schema = Database::getConnection()->schema(); - $schema->addIndex('advancedqueue', 'queue_expires', ['expires'], $spec['advancedqueue']); + $database_schema = \Drupal::database()->schema(); + $database_schema->addIndex('advancedqueue', 'queue_expires', ['expires'], $spec['advancedqueue']); +} + +/** + * Add hash column to {advancedqueue} table. + */ +function advancedqueue_update_8103() { + $spec = advancedqueue_schema(); + $database_schema = \Drupal::database()->schema(); + if (!$database_schema->fieldExists('advancedqueue', 'unique_id')) { + $database_schema->addField('advancedqueue', 'unique_id', $spec['advancedqueue']['fields']['unique_id']); + $database_schema->addIndex('advancedqueue', 'unique_id', ['unique_id'], $spec['advancedqueue']); + } } diff --git a/src/Annotation/AdvancedQueueJobType.php b/src/Annotation/AdvancedQueueJobType.php index bd3b10c..d98e126 100644 --- a/src/Annotation/AdvancedQueueJobType.php +++ b/src/Annotation/AdvancedQueueJobType.php @@ -13,6 +13,21 @@ use Drupal\Component\Annotation\Plugin; */ class AdvancedQueueJobType extends Plugin { + /** + * Allow duplicate jobs. + */ + const UNIQUE_DUPLICATES = 'allow'; + + /** + * Do not enqueue the neew job. + */ + const UNIQUE_DONT_OVERWRITE = 'leave'; + + /** + * Overwrite existing job. + */ + const UNIQUE_OVERWRITE = 'overwrite'; + /** * The plugin ID. * @@ -43,4 +58,11 @@ class AdvancedQueueJobType extends Plugin { */ public $retry_delay = 10; + /** + * Whether jobs of this type must have uniqueness ensured by a backend. + * + * @var string + */ + public $ensure_unique = self::UNIQUE_OVERWRITE; + } diff --git a/src/Entity/Queue.php b/src/Entity/Queue.php index 6f074c4..3279ff8 100644 --- a/src/Entity/Queue.php +++ b/src/Entity/Queue.php @@ -4,8 +4,10 @@ namespace Drupal\advancedqueue\Entity; use Drupal\advancedqueue\BackendPluginCollection; use Drupal\advancedqueue\Job; +use Drupal\advancedqueue\Plugin\AdvancedQueue\Backend\SupportsUniqueJobsInterface; use Drupal\Core\Config\Entity\ConfigEntityBase; use Drupal\Core\Entity\EntityStorageInterface; +use Drupal\Core\StringTranslation\StringTranslationTrait; /** * Defines the queue entity class. @@ -59,6 +61,8 @@ use Drupal\Core\Entity\EntityStorageInterface; */ class Queue extends ConfigEntityBase implements QueueInterface { + use StringTranslationTrait; + /** * The queue ID. * @@ -128,6 +132,9 @@ class Queue extends ConfigEntityBase implements QueueInterface { * {@inheritdoc} */ public function enqueueJob(Job $job, $delay = 0) { + if ($this->jobTypeManager()->jobTypeRequiresUniqueness($job->getType()) && !$this->getBackend() instanceof SupportsUniqueJobsInterface) { + throw new \InvalidArgumentException($this->t("Backend :type doesn't support unique jobs", [':type' => $this->getBackend()->getLabel()])); + } return $this->getBackend()->enqueueJob($job, $delay); } @@ -135,6 +142,11 @@ class Queue extends ConfigEntityBase implements QueueInterface { * {@inheritdoc} */ public function enqueueJobs(array $jobs, $delay = 0) { + // Just check the first job as this could be time consuming for multiple. + $job = reset($jobs); + if ($this->jobTypeManager()->jobTypeRequiresUniqueness($job->getType()) && !$this->getBackend() instanceof SupportsUniqueJobsInterface) { + throw new \InvalidArgumentException($this->t("Backend :type doesn't support unique jobs", [':type' => $this->getBackend()->getLabel()])); + } return $this->getBackend()->enqueueJobs($jobs, $delay); } @@ -293,4 +305,14 @@ class Queue extends ConfigEntityBase implements QueueInterface { } } + /** + * Gets the Job type manager service. + * + * @return \Drupal\advancedqueue\JobTypeManager + * Job type manager. + */ + protected function jobTypeManager() { + return \Drupal::service('plugin.manager.advancedqueue_job_type'); + } + } diff --git a/src/Job.php b/src/Job.php index 1dadcdb..f6d421b 100644 --- a/src/Job.php +++ b/src/Job.php @@ -87,6 +87,13 @@ class Job { */ protected $expires; + /** + * Unique Id for detecting uniqueness. + * + * @var string + */ + protected $uniqueId; + /** * Constructs a new Job object. * @@ -111,6 +118,7 @@ class Job { $this->available = !empty($definition['available']) ? (int) $definition['available'] : 0; $this->processed = !empty($definition['processed']) ? (int) $definition['processed'] : 0; $this->expires = !empty($definition['expires']) ? (int) $definition['expires'] : 0; + $this->uniqueId = !empty($definition['unique_id']) ? $definition['unique_id'] : NULL; } /** @@ -120,15 +128,17 @@ class Job { * The job type. * @param array $payload * The payload. + * @param array $definition + * The definition. * * @return static */ - public static function create($type, array $payload) { + public static function create($type, array $payload, array $definition = []) { return new static([ 'type' => $type, 'payload' => $payload, 'state' => self::STATE_QUEUED, - ]); + ] + $definition); } /** @@ -398,6 +408,52 @@ class Job { return $this; } + /** + * Return the unique id of the job. + * + * @return null|string + * The unique id will be returned if there is none set a hash of the payload + * will be returned. + */ + public function getUniqueId() { + if (!\Drupal::service('plugin.manager.advancedqueue_job_type')->jobTypeRequiresUniqueness($this->type)) { + return NULL; + } + if (empty($this->uniqueId)) { + return $this->createJobHash(); + } + + return $this->uniqueId; + } + + /** + * Set the unique id of the job. + * + * @param string $unique_id + * This can be any 32 character identifier. + */ + public function setUniqueId($unique_id) { + $this->uniqueId = $unique_id; + } + + /** + * Creates a unique hash for this job. + * + * @return string + * A hash that is unique for the job payload, job type, and the job's queue. + */ + protected function createJobHash() { + // Use the queue name, the job type, and the payload to create a hash. + $data = $this->getQueueId() . $this->getType() . serialize($this->getPayload()); + + // crc32b is the fastest but has collisions due to its short length. + // sha1 and md5 are forbidden by many projects and organizations. + // This is the next fastest option. + $hash = hash('tiger128,3', $data); + + return $hash; + } + /** * Returns the job as an array. * @@ -415,6 +471,7 @@ class Job { 'available' => $this->available, 'processed' => $this->processed, 'expires' => $this->expires, + 'unique_id' => $this->getUniqueId(), ]; } diff --git a/src/JobTypeManager.php b/src/JobTypeManager.php index a675bcc..e6db1d7 100644 --- a/src/JobTypeManager.php +++ b/src/JobTypeManager.php @@ -31,4 +31,33 @@ class JobTypeManager extends DefaultPluginManager { $this->setCacheBackend($cache_backend, 'advancedqueue_job_type_plugins'); } + /** + * Determines whether the give job type requires support for unique jobs. + * + * @param string $job_type_id + * The job type ID. + * + * @return bool + * TRUE if the job type requires a queue backend that supports unique jobs; + * FALSE if support for unique jobs is not necessary. + */ + public function jobTypeRequiresUniqueness($job_type_id) { + $job_type_definition = $this->getDefinitions()[$job_type_id]; + return $job_type_definition['ensure_unique'] !== AdvancedQueueJobType::UNIQUE_DUPLICATES; + } + + /** + * Returns the uniqueness flag to see how to handle it. + * + * @param string $job_type_id + * The job type ID. + * + * @return string + * The type of uniqueness that is required. + */ + public function jobTypeRequiresUniquenessType($job_type_id) { + $job_type_definition = $this->getDefinitions()[$job_type_id]; + return $job_type_definition['ensure_unique']; + } + } diff --git a/src/Plugin/AdvancedQueue/Backend/BackendBase.php b/src/Plugin/AdvancedQueue/Backend/BackendBase.php index 9c8d69c..0d2bbcb 100644 --- a/src/Plugin/AdvancedQueue/Backend/BackendBase.php +++ b/src/Plugin/AdvancedQueue/Backend/BackendBase.php @@ -2,6 +2,7 @@ namespace Drupal\advancedqueue\Plugin\AdvancedQueue\Backend; +use Drupal\advancedqueue\JobTypeManager; use Drupal\Component\Datetime\TimeInterface; use Drupal\Component\Utility\NestedArray; use Drupal\Core\Form\FormStateInterface; @@ -21,6 +22,13 @@ abstract class BackendBase extends PluginBase implements BackendInterface, Conta */ protected $time; + /** + * The job type plugin manager. + * + * @var \Drupal\advancedqueue\JobTypeManager + */ + protected $jobTypeManager; + /** * The current queue ID. * @@ -39,11 +47,14 @@ abstract class BackendBase extends PluginBase implements BackendInterface, Conta * The plugin implementation definition. * @param \Drupal\Component\Datetime\TimeInterface $time * The time. + * @param \Drupal\advancedqueue\JobTypeManager $job_type_manager + * The job type plugin manager. */ - public function __construct(array $configuration, $plugin_id, $plugin_definition, TimeInterface $time) { + public function __construct(array $configuration, $plugin_id, $plugin_definition, TimeInterface $time, JobTypeManager $job_type_manager) { parent::__construct($configuration, $plugin_id, $plugin_definition); $this->time = $time; + $this->jobTypeManager = $job_type_manager; if (array_key_exists('_entity_id', $configuration)) { $this->queueId = $configuration['_entity_id']; unset($configuration['_entity_id']); @@ -59,7 +70,8 @@ abstract class BackendBase extends PluginBase implements BackendInterface, Conta $configuration, $plugin_id, $plugin_definition, - $container->get('datetime.time') + $container->get('datetime.time'), + $container->get('plugin.manager.advancedqueue_job_type') ); } @@ -133,6 +145,36 @@ abstract class BackendBase extends PluginBase implements BackendInterface, Conta return (string) $this->pluginDefinition['label']; } + /** + * Helper for non-unique backends to prevent queueing of jobs with uniqueness. + * + * Backends that do not support unique jobs should call this from + * enqueueJobs() to ensure that jobs that require uniqueness are not queued. + * + * @param \Drupal\advancedqueue\Job[] $jobs + * The jobs that are to be queued. + * + * @throws \Drupal\advancedqueue\Plugin\AdvancedQueue\Backend\InvalidBackendException + * Throws an exception if a job that requires uniqueness is being queued in + * a backend that doesn't support unique jobs. + */ + protected function ensureUniqueJobsSupport(array $jobs) { + if (!($this instanceof SupportsUniqueJobsInterface)) { + foreach ($jobs as $job) { + // Check with the manager rather than instantiate a job type plugin for + // each job, as queueing should be a quick operation. + if ($this->jobTypeManager->jobTypeRequiresUniqueness($job->getType())) { + throw new InvalidBackendException(sprintf( + "Job of type %s with payload %s cannot be queued in queue %s because the queue's backend does not support unique jobs.", + $job->getType(), + serialize($job->getPayload()), + $this->queueId + )); + } + } + } + } + /** * {@inheritdoc} */ diff --git a/src/Plugin/AdvancedQueue/Backend/BackendInterface.php b/src/Plugin/AdvancedQueue/Backend/BackendInterface.php index ace42f3..5a20f0e 100644 --- a/src/Plugin/AdvancedQueue/Backend/BackendInterface.php +++ b/src/Plugin/AdvancedQueue/Backend/BackendInterface.php @@ -70,6 +70,9 @@ interface BackendInterface extends ConfigurableInterface, PluginFormInterface, P * The job will be modified with the assigned queue ID, job ID, and * relevant timestamps. * + * If the job's plugin requires uniqueness, and the given job is a duplicate + * of a job already in the queue, then the job will be unchanged. + * * @param \Drupal\advancedqueue\Job $job * The job. * @param int $delay @@ -82,13 +85,18 @@ interface BackendInterface extends ConfigurableInterface, PluginFormInterface, P * Enqueues the given jobs. * * Each job will be modified with the assigned queue ID, job ID, and - * relevant timestamps. + * relevant timestamps, with the exception of duplicate jobs with a job type + * that requires uniqueness. * * @param \Drupal\advancedqueue\Job[] $jobs * The jobs. * @param int $delay * The time, in seconds, after which the jobs will become available to * consumers. Defaults to 0, indicating no delay. + * + * @throws \Drupal\advancedqueue\Plugin\AdvancedQueue\Backend\InvalidBackendException + * Throws an exception if the queue's backend is not suitable in some way + * for any of the given jobs. */ public function enqueueJobs(array $jobs, $delay = 0); diff --git a/src/Plugin/AdvancedQueue/Backend/Database.php b/src/Plugin/AdvancedQueue/Backend/Database.php index f3f0e98..ef35d2c 100644 --- a/src/Plugin/AdvancedQueue/Backend/Database.php +++ b/src/Plugin/AdvancedQueue/Backend/Database.php @@ -2,22 +2,32 @@ namespace Drupal\advancedqueue\Plugin\AdvancedQueue\Backend; +use Drupal\advancedqueue\Annotation\AdvancedQueueJobType; use Drupal\advancedqueue\Entity\Queue; use Drupal\advancedqueue\Entity\QueueInterface; use Drupal\advancedqueue\Job; +use Drupal\advancedqueue\JobTypeManager; use Drupal\Component\Datetime\TimeInterface; use Drupal\Core\Database\Connection; +use Drupal\Core\Logger\LoggerChannelFactoryInterface; use Symfony\Component\DependencyInjection\ContainerInterface; /** * Provides the database queue backend. * + * This supports unique jobs. Jobs of the same type and in the same queue + * are considered duplicates if their payloads are identical, and if they are + * not yet completed. Jobs with the same payload in different queues, or of + * different types, are not considered to be duplicates. Neither are completed + * jobs of the same type, in the same queue, with identical payloads. + * + * * @AdvancedQueueBackend( * id = "database", * label = @Translation("Database"), * ) */ -class Database extends BackendBase implements SupportsDeletingJobsInterface, SupportsListingJobsInterface, SupportsReleasingJobsInterface, SupportsLoadingJobsInterface { +class Database extends BackendBase implements SupportsDeletingJobsInterface, SupportsListingJobsInterface, SupportsReleasingJobsInterface, SupportsLoadingJobsInterface, SupportsUniqueJobsInterface { /** * The database connection. @@ -26,6 +36,13 @@ class Database extends BackendBase implements SupportsDeletingJobsInterface, Sup */ protected $connection; + /** + * The logger factory. + * + * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface + */ + protected $loggerFactory; + /** * Constructs a new Database object. * @@ -37,13 +54,18 @@ class Database extends BackendBase implements SupportsDeletingJobsInterface, Sup * The plugin implementation definition. * @param \Drupal\Component\Datetime\TimeInterface $time * The time. + * @param \Drupal\advancedqueue\JobTypeManager $job_type_manager + * The job type plugin manager. * @param \Drupal\Core\Database\Connection $connection * The database connection to use. + * @param \Drupal\Core\Logger\LoggerChannelFactoryInterface $logger_factory + * The logger factory. */ - public function __construct(array $configuration, $plugin_id, $plugin_definition, TimeInterface $time, Connection $connection) { - parent::__construct($configuration, $plugin_id, $plugin_definition, $time); + public function __construct(array $configuration, $plugin_id, $plugin_definition, TimeInterface $time, JobTypeManager $job_type_manager, Connection $connection, LoggerChannelFactoryInterface $logger_factory) { + parent::__construct($configuration, $plugin_id, $plugin_definition, $time, $job_type_manager); $this->connection = $connection; + $this->loggerFactory = $logger_factory; } /** @@ -55,7 +77,9 @@ class Database extends BackendBase implements SupportsDeletingJobsInterface, Sup $plugin_id, $plugin_definition, $container->get('datetime.time'), - $container->get('database') + $container->get('plugin.manager.advancedqueue_job_type'), + $container->get('database'), + $container->get('logger.factory') ); } @@ -181,21 +205,70 @@ class Database extends BackendBase implements SupportsDeletingJobsInterface, Sup /** @var \Drupal\advancedqueue\Job $job */ foreach ($jobs as $job) { + // The uniqueness check requires the queue ID to be set on the job. $job->setQueueId($this->queueId); $job->setState(Job::STATE_QUEUED); if (!$job->getAvailableTime()) { $job->setAvailableTime($this->time->getCurrentTime() + $delay); } + // Check if the job should be unique. + $requires_uniqueness = $this->jobTypeManager->jobTypeRequiresUniquenessType($job->getType()); + if ($requires_uniqueness == AdvancedQueueJobType::UNIQUE_DONT_OVERWRITE && $this->hasDuplicateJob($job)) { + // Skip a duplicate job. + $this->loggerFactory->get('advanced_queue')->notice('Duplicate job of type %type was not queued in queue %queue. Payload was %payload.', [ + '%type' => $job->getType(), + '%queue' => $this->queueId, + '%payload' => json_encode($job->getPayload()), + ]); + + continue; + } + elseif ($requires_uniqueness == AdvancedQueueJobType::UNIQUE_OVERWRITE && ($job_id = $this->hasDuplicateJob($job))) { + $this->loggerFactory->get('advanced_queue')->notice('Overwriting job %id of type %type in queue %queue. Payload was %payload.', [ + '%id' => $job_id, + '%type' => $job->getType(), + '%queue' => $this->queueId, + '%payload' => json_encode($job->getPayload()), + ]); + } + $fields = $job->toArray(); unset($fields['id']); + $fields['payload'] = json_encode($fields['payload']); - // InsertQuery supports inserting multiple rows at once, which is faster, - // but that doesn't give us the inserted job IDs. - $query = $this->connection->insert('advancedqueue')->fields($fields); - $job_id = $query->execute(); + if (isset($job_id)) { + $this->connection->update('advancedqueue') + ->fields($fields) + ->condition('job_id', $job_id) + ->execute(); + } + else { + // InsertQuery supports inserting multiple rows at once, which is faster, + // but that doesn't give us the inserted job IDs. + $job_id = $this->connection->insert('advancedqueue') + ->fields($fields) + ->execute(); + + $job->setId($job_id); + } + } + } + + /** + * {@inheritdoc} + */ + public function hasDuplicateJob(Job $job) { + $unique_id = $job->getUniqueId(); + $query = $this->connection->select('advancedqueue', 'a') + ->fields('a', ['job_id']) + ->condition('a.unique_id', $unique_id) + ->condition('a.state', [Job::STATE_QUEUED, Job::STATE_PROCESSING], 'IN'); + + $result = $query->execute()->fetchField(); - $job->setId($job_id); + if ($result !== FALSE) { + return $result; } } diff --git a/src/Plugin/AdvancedQueue/Backend/InvalidBackendException.php b/src/Plugin/AdvancedQueue/Backend/InvalidBackendException.php new file mode 100644 index 0000000..4833ec4 --- /dev/null +++ b/src/Plugin/AdvancedQueue/Backend/InvalidBackendException.php @@ -0,0 +1,10 @@ +enqueueJobs([$job], $delay); + } + + /** + * {@inheritdoc} + */ + public function enqueueJobs(array $jobs, $delay = 0) { + $this->ensureUniqueJobsSupport($jobs); + + // No need to do anything else. + } + + /** + * {@inheritdoc} + */ + public function createQueue() {} + + /** + * {@inheritdoc} + */ + public function deleteQueue() {} + + /** + * {@inheritdoc} + */ + public function countJobs() {} + + /** + * {@inheritdoc} + */ + public function retryJob(Job $job, $delay = 0) {} + + /** + * {@inheritdoc} + */ + public function claimJob() {} + + /** + * {@inheritdoc} + */ + public function onSuccess(Job $job) {} + + /** + * {@inheritdoc} + */ + public function onFailure(Job $job) {} + + /** + * {@inheritdoc} + */ + public function releaseJob($job_id) {} + + /** + * {@inheritdoc} + */ + public function deleteJob($job_id) {} + +} diff --git a/tests/modules/advancedqueue_test/src/Plugin/AdvancedQueue/JobType/Unique.php b/tests/modules/advancedqueue_test/src/Plugin/AdvancedQueue/JobType/Unique.php new file mode 100644 index 0000000..48db956 --- /dev/null +++ b/tests/modules/advancedqueue_test/src/Plugin/AdvancedQueue/JobType/Unique.php @@ -0,0 +1,26 @@ +assertEquals($first_job, $first_claimed_job); } + /** + * Tests handling of unique jobs. + */ + public function testUniqueJobs() { + $job = Job::create('unique', ['test' => '1']); + $this->firstQueue->getBackend()->enqueueJob($job); + + $this->assertQueuedCount(1, $this->firstQueue, "The first copy of job was queued."); + + // Confirm that the backend does not requeue the same job. + $job = Job::create('unique', ['test' => '1']); + $this->firstQueue->getBackend()->enqueueJob($job); + + $this->assertQueuedCount(1, $this->firstQueue, "The second copy of the job was not queued."); + + // Confirm that the backend can queue identical jobs in two different + // queues. + $job = Job::create('unique', ['test' => '1']); + $this->secondQueue->getBackend()->enqueueJob($job); + + $this->assertQueuedCount(1, $this->secondQueue, "The same job was queued in the second queue."); + + // Confirm that the same queue can hold unique jobs of different types with + // identical payloads. + $job = Job::create('unique_also', ['test' => '1']); + $this->firstQueue->getBackend()->enqueueJob($job); + + $this->assertQueuedCount(2, $this->firstQueue, "A job of a different type with the same payload was queued."); + } + /** * @covers ::cleanupQueue */ @@ -273,4 +305,23 @@ class DatabaseBackendTest extends KernelTestBase { $this->assertEquals(635814000 + $expected_delay, $job->getAvailableTime()); } + /** + * Asserts the count of queued jobs in a queue. + * + * @param int $expected_count + * The expected number of jobs. + * @param \Drupal\advancedqueue\Entity\QueueInterface $queue + * The queue. + * @param string $message + * (optional) The assertion message. + */ + protected function assertQueuedCount($expected_count, QueueInterface $queue, $message = NULL) { + if (empty($message)) { + $message = "The queue contains {$expected_count} queued jobs."; + } + + $counts = $queue->getBackend()->countJobs(); + $this->assertEquals($expected_count, $counts[Job::STATE_QUEUED], $message); + } + } diff --git a/tests/src/Kernel/NonUniqueBackendTest.php b/tests/src/Kernel/NonUniqueBackendTest.php new file mode 100644 index 0000000..6392084 --- /dev/null +++ b/tests/src/Kernel/NonUniqueBackendTest.php @@ -0,0 +1,61 @@ +installSchema('advancedqueue', ['advancedqueue']); + + $this->queue = Queue::create([ + 'id' => 'non_unique_queue', + 'label' => 'First queue', + 'backend' => 'non_unique', + ]); + $this->queue->save(); + } + + /** + * Tests that a job requiring uniqueness cannot be queued. + */ + public function testQueue() { + $job = Job::create('unique', ['test' => '1']); + + $this->setExpectedException(InvalidBackendException::class, 'Job of type unique with payload a:1:{s:4:"test";s:1:"1";} cannot be queued in queue non_unique_queue because the queue\'s backend does not support unique jobs.'); + + $this->queue->getBackend()->enqueueJobs([$job]); + } + +}