diff --git a/feeds.module b/feeds.module index 319da64..ee8df25 100644 --- a/feeds.module +++ b/feeds.module @@ -39,9 +39,9 @@ function feeds_cron() { if ($feed->isLocked()) { continue; } - $queue = \Drupal::queue('feeds_feed_import:' . $feed->bundle()); + $queue = \Drupal::queue('feeds_feed_refresh:' . $feed->bundle()); - if ($queue->createItem([$feed, FeedRefresh::BEGIN])) { + if ($queue->createItem([$feed, FeedRefresh::BEGIN, []])) { // Add timestamp to avoid queueing item more than once. $feed->setQueuedTime(REQUEST_TIME); $feed->save(); diff --git a/src/Entity/FeedType.php b/src/Entity/FeedType.php index f806522..8aac274 100644 --- a/src/Entity/FeedType.php +++ b/src/Entity/FeedType.php @@ -468,7 +468,7 @@ class FeedType extends ConfigEntityBundleBase implements FeedTypeInterface, Enti // Clear the queue worker plugin cache so that our derivatives will be // found. \Drupal::service('plugin.manager.queue_worker')->clearCachedDefinitions(); - \Drupal::queue('feeds_feed_import:' . $this->id())->createQueue(); + \Drupal::queue('feeds_feed_refresh:' . $this->id())->createQueue(); } } @@ -476,18 +476,14 @@ class FeedType extends ConfigEntityBundleBase implements FeedTypeInterface, Enti * {@inheritdoc} */ public static function postDelete(EntityStorageInterface $storage, array $entities) { - $prefixes = ['feeds_feed_import', 'feeds_feed_parse', 'feeds_feed_process']; - foreach ($entities as $entity) { foreach ($entity->getPlugins() as $plugin) { $plugin->onFeedTypeDelete(); } // Delete any existing queues related to this type. - foreach ($prefixes as $prefix) { - if ($queue = \Drupal::queue($prefix . ':' . $entity->id())) { - $queue->deleteQueue(); - } + if ($queue = \Drupal::queue('feeds_feed_refresh:' . $entity->id())) { + $queue->deleteQueue(); } } diff --git a/src/Plugin/QueueWorker/FeedParse.php b/src/Plugin/QueueWorker/FeedParse.php deleted file mode 100644 index 8d0c22a..0000000 --- a/src/Plugin/QueueWorker/FeedParse.php +++ /dev/null @@ -1,50 +0,0 @@ -switchAccount($feed); - - try { - $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'parse')); - $parse_event = $this->dispatchEvent(FeedsEvents::PARSE, new ParseEvent($feed, $fetcher_result)); - $feed->setState(StateInterface::PROCESS, NULL); - - $feed->saveStates(); - $queue = $this->queueFactory->get('feeds_feed_process:' . $feed->bundle()); - - foreach ($parse_event->getParserResult() as $item) { - $queue->createItem([$feed, $item]); - } - // Add a final process queue item that finalizes the import. - $queue->createItem([$feed, $fetcher_result]); - } - catch (\Exception $exception) { - return $this->handleException($feed, $exception); - } - finally { - $switcher->switchBack(); - } - } - -} diff --git a/src/Plugin/QueueWorker/FeedProcess.php b/src/Plugin/QueueWorker/FeedProcess.php deleted file mode 100644 index 8ad5892..0000000 --- a/src/Plugin/QueueWorker/FeedProcess.php +++ /dev/null @@ -1,65 +0,0 @@ -switchAccount($feed); - - try { - if ($item instanceof FetcherResultInterface) { - $this->finish($feed, $item); - return; - } - - $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'process')); - $this->dispatchEvent(FeedsEvents::PROCESS, new ProcessEvent($feed, $item)); - - $feed->saveStates(); - } - catch (\Exception $exception) { - return $this->handleException($feed, $exception); - } - finally { - $switcher->switchBack(); - } - } - - /** - * Finalizes the import. - */ - protected function finish(FeedInterface $feed, FetcherResultInterface $fetcher_result) { - if ($feed->progressParsing() !== StateInterface::BATCH_COMPLETE) { - $this->queueFactory->get('feeds_feed_parse:' . $feed->bundle())->createItem([$feed, $fetcher_result]); - } - elseif ($feed->progressFetching() !== StateInterface::BATCH_COMPLETE) { - $this->queueFactory->get('feeds_feed_import:' . $feed->bundle())->createItem([$feed, FeedRefresh::RESUME]); - } - else { - $feed->finishImport(); - } - } - -} diff --git a/src/Plugin/QueueWorker/FeedRefresh.php b/src/Plugin/QueueWorker/FeedRefresh.php index 845bf70..8de2362 100644 --- a/src/Plugin/QueueWorker/FeedRefresh.php +++ b/src/Plugin/QueueWorker/FeedRefresh.php @@ -5,13 +5,17 @@ namespace Drupal\feeds\Plugin\QueueWorker; use Drupal\feeds\Event\FeedsEvents; use Drupal\feeds\Event\FetchEvent; use Drupal\feeds\Event\InitEvent; +use Drupal\feeds\Event\ParseEvent; +use Drupal\feeds\Event\ProcessEvent; use Drupal\feeds\Exception\LockException; use Drupal\feeds\FeedInterface; +use Drupal\feeds\Feeds\Item\ItemInterface; +use Drupal\feeds\Result\FetcherResultInterface; use Drupal\feeds\StateInterface; /** * @QueueWorker( - * id = "feeds_feed_import", + * id = "feeds_feed_refresh", * title = @Translation("Feed refresh"), * cron = {"time" = 60}, * deriver = "Drupal\feeds\Plugin\Derivative\FeedQueueWorker" @@ -34,22 +38,91 @@ class FeedRefresh extends FeedQueueWorkerBase { const RESUME = 'resume'; /** + * Parameter passed when parsing. + * + * @var string + */ + const PARSE = 'parse'; + + /** + * Parameter passed when processing. + * + * @var string + */ + const PROCESS = 'process'; + + /** + * Parameter passed when finishing. + * + * @var string + */ + const FINISH = 'finish'; + + /** * {@inheritdoc} */ public function processItem($data) { - $operation = static::BEGIN; - $feed = $data; - - // @todo Backwards compat check. Remove at later date. - if (is_array($data)) { - list($feed, $operation) = $feed; - } + list($feed, $stage, $params) = $data; if (!$feed instanceof FeedInterface) { return; } - if ($operation === static::BEGIN) { + $switcher = $this->switchAccount($feed); + + try { + switch ($stage) { + case static::BEGIN: + case static::RESUME: + $this->import($feed, $stage); + break; + + case static::PARSE: + $this->doParse($feed, $params['fetcher_result']); + break; + + case static::PROCESS: + $this->doProcess($feed, $params['item']); + break; + + case static::FINISH: + $this->finish($feed, $params['fetcher_result']); + break; + } + } + catch (\Exception $exception) { + return $this->handleException($feed, $exception); + } + finally { + $switcher->switchBack(); + } + } + + /** + * Queues an item. + * + * @param \Drupal\feeds\FeedInterface $feed + * The feed for which to queue an item. + * @param string $stage + * The stage of importing. + * @param [] $params + * Additional parameters. + */ + protected function queueItem(FeedInterface $feed, $stage, $params = []) { + $this->queueFactory->get('feeds_feed_refresh:' . $feed->bundle()) + ->createItem([$feed, $stage, $params]); + } + + /** + * Begin or resume an import. + * + * @param \Drupal\feeds\FeedInterface $feed + * The feed to perform an import on. + * @param string $stage + * The stage of importing. + */ + protected function import(FeedInterface $feed, $stage) { + if ($stage === static::BEGIN) { try { $feed->lock(); } @@ -59,22 +132,75 @@ class FeedRefresh extends FeedQueueWorkerBase { $feed->clearStates(); } - $switcher = $this->switchAccount($feed); - try { - $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'fetch')); - $fetch_event = $this->dispatchEvent(FeedsEvents::FETCH, new FetchEvent($feed)); - $feed->setState(StateInterface::PARSE, NULL); + $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'fetch')); + $fetch_event = $this->dispatchEvent(FeedsEvents::FETCH, new FetchEvent($feed)); + $feed->setState(StateInterface::PARSE, NULL); - $feed->saveStates(); - $this->queueFactory->get('feeds_feed_parse:' . $feed->bundle()) - ->createItem([$feed, $fetch_event->getFetcherResult()]); + $feed->saveStates(); + $this->queueItem($feed, static::PARSE, [ + 'fetcher_result' => $fetch_event->getFetcherResult(), + ]); + } + + /** + * Parses. + * + * @param \Drupal\feeds\FeedInterface $feed + * The feed to perform a parse event on. + * @param \Drupal\feeds\Result\FetcherResultInterface + * The fetcher result. + */ + protected function doParse(FeedInterface $feed, FetcherResultInterface $fetcher_result) { + $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'parse')); + $parse_event = $this->dispatchEvent(FeedsEvents::PARSE, new ParseEvent($feed, $fetcher_result)); + + $feed->saveStates(); + + foreach ($parse_event->getParserResult() as $item) { + $this->queueItem($feed, static::PROCESS, [ + 'item' => $item, + ]); } - catch (\Exception $exception) { - return $this->handleException($feed, $exception); + + // Add a final queue item that finalizes the import. + $this->queueItem($feed, static::FINISH, [ + 'fetcher_result' => $fetcher_result, + ]); + } + + /** + * Processes an item. + * + * @param \Drupal\feeds\FeedInterface $feed + * The feed to perform a process event on. + * @param \Drupal\feeds\Feeds\Item\ItemInterface + * The item to import. + */ + protected function doProcess(FeedInterface $feed, ItemInterface $item) { + $this->dispatchEvent(FeedsEvents::INIT_IMPORT, new InitEvent($feed, 'process')); + $this->dispatchEvent(FeedsEvents::PROCESS, new ProcessEvent($feed, $item)); + + $feed->saveStates(); + } + + /** + * Finalizes the import. + */ + protected function finish(FeedInterface $feed, FetcherResultInterface $fetcher_result) { + // Update item count. + $feed->save(); + + if ($feed->progressParsing() !== StateInterface::BATCH_COMPLETE) { + $this->queueItem($feed, static::PARSE, [ + 'fetcher_result' => $fetcher_result, + ]); } - finally { - $switcher->switchBack(); + elseif ($feed->progressFetching() !== StateInterface::BATCH_COMPLETE) { + $this->queueItem($feed, static::RESUME); + } + else { + $feed->finishImport(); } } diff --git a/tests/src/Unit/Plugin/QueueWorker/FeedParseTest.php b/tests/src/Unit/Plugin/QueueWorker/FeedParseTest.php deleted file mode 100644 index aa56e98..0000000 --- a/tests/src/Unit/Plugin/QueueWorker/FeedParseTest.php +++ /dev/null @@ -1,65 +0,0 @@ -dispatcher = new EventDispatcher(); - $queue_factory = $this->getMock('Drupal\Core\Queue\QueueFactory', [], [], '', FALSE); - $queue_factory->expects($this->any()) - ->method('get') - ->with('feeds_feed_process:') - ->will($this->returnValue($this->getMock('Drupal\Core\Queue\QueueInterface'))); - - $container->set('queue', $queue_factory); - $container->set('event_dispatcher', $this->dispatcher); - $container->set('account_switcher', $this->getMockedAccountSwitcher()); - - $this->plugin = FeedParse::create($container, [], 'feeds_feed_parse', []); - $this->feed = $this->getMockFeed(); - } - - public function test() { - $this->dispatcher->addListener(FeedsEvents::PARSE, function ($parse_event) { - $parser_result = new ParserResult(); - $parser_result->addItem(new DynamicItem()); - $parse_event->setParserResult($parser_result); - }); - - $fetcher_result = new FetcherResult(''); - - $this->plugin->processItem([$this->feed, $fetcher_result]); - } - - /** - * @expectedException \RuntimeException - */ - public function testException() { - $this->dispatcher->addListener(FeedsEvents::PARSE, function ($parse_event) { - throw new \RuntimeException(); - }); - - $this->plugin->processItem([$this->feed, new FetcherResult('')]); - } - -} diff --git a/tests/src/Unit/Plugin/QueueWorker/FeedProcessTest.php b/tests/src/Unit/Plugin/QueueWorker/FeedProcessTest.php deleted file mode 100644 index 8b0c570..0000000 --- a/tests/src/Unit/Plugin/QueueWorker/FeedProcessTest.php +++ /dev/null @@ -1,70 +0,0 @@ -dispatcher = new EventDispatcher(); - $queue_factory = $this->getMock('Drupal\Core\Queue\QueueFactory', [], [], '', FALSE); - $queue_factory->expects($this->any()) - ->method('get') - ->will($this->returnValue($this->getMock('Drupal\Core\Queue\QueueInterface'))); - - $container->set('queue', $queue_factory); - $container->set('event_dispatcher', $this->dispatcher); - $container->set('account_switcher', $this->getMockedAccountSwitcher()); - - $this->plugin = FeedProcess::create($container, [], 'feeds_feed_process', []); - $this->feed = $this->getMockFeed(); - } - - public function test() { - $this->plugin->processItem([$this->feed, new DynamicItem()]); - } - - /** - * @expectedException \RuntimeException - */ - public function testException() { - $this->dispatcher->addListener(FeedsEvents::PROCESS, function ($parse_event) { - throw new \RuntimeException(); - }); - - $this->plugin->processItem([$this->feed, new DynamicItem()]); - } - - public function testFinalPass() { - $this->plugin->processItem([$this->feed, new FetcherResult('')]); - - $this->feed->expects($this->exactly(2)) - ->method('progressParsing') - ->will($this->returnValue(StateInterface::BATCH_COMPLETE)); - - $this->plugin->processItem([$this->feed, new FetcherResult('')]); - $this->feed->expects($this->once()) - ->method('progressFetching') - ->will($this->returnValue(StateInterface::BATCH_COMPLETE)); - $this->plugin->processItem([$this->feed, new FetcherResult('')]); - } - -} diff --git a/tests/src/Unit/Plugin/QueueWorker/FeedRefreshTest.php b/tests/src/Unit/Plugin/QueueWorker/FeedRefreshTest.php index 4dc6397..1e8628c 100644 --- a/tests/src/Unit/Plugin/QueueWorker/FeedRefreshTest.php +++ b/tests/src/Unit/Plugin/QueueWorker/FeedRefreshTest.php @@ -5,7 +5,11 @@ namespace Drupal\Tests\feeds\Unit\Plugin\QueueWorker; use Drupal\Core\DependencyInjection\ContainerBuilder; use Drupal\feeds\Event\FeedsEvents; use Drupal\feeds\Exception\LockException; +use Drupal\feeds\Feeds\Item\DynamicItem; use Drupal\feeds\Plugin\QueueWorker\FeedRefresh; +use Drupal\feeds\Result\FetcherResult; +use Drupal\feeds\Result\ParserResult; +use Drupal\feeds\StateInterface; use Drupal\Tests\feeds\Unit\FeedsUnitTestCase; use Symfony\Component\EventDispatcher\EventDispatcher; @@ -15,10 +19,30 @@ use Symfony\Component\EventDispatcher\EventDispatcher; */ class FeedRefreshTest extends FeedsUnitTestCase { + /** + * The event dispatcher. + * + * @var \Symfony\Component\EventDispatcher\EventDispatcher + */ protected $dispatcher; + + /** + * The QueueWorker plugin. + * + * @var Drupal\feeds\Plugin\QueueWorker\FeedRefresh + */ protected $plugin; + + /** + * The feed. + * + * @var Drupal\feeds\FeedInterface + */ protected $feed; + /** + * {@inheritdoc} + */ public function setUp() { parent::setUp(); $container = new ContainerBuilder(); @@ -26,38 +50,155 @@ class FeedRefreshTest extends FeedsUnitTestCase { $queue_factory = $this->getMock('Drupal\Core\Queue\QueueFactory', [], [], '', FALSE); $queue_factory->expects($this->any()) ->method('get') - ->with('feeds_feed_parse:') + ->with('feeds_feed_refresh:') ->will($this->returnValue($this->getMock('Drupal\Core\Queue\QueueInterface'))); $container->set('queue', $queue_factory); $container->set('event_dispatcher', $this->dispatcher); $container->set('account_switcher', $this->getMockedAccountSwitcher()); - $this->plugin = FeedRefresh::create($container, [], 'feeds_feed_parse', []); + $this->plugin = FeedRefresh::create($container, [], 'feeds_feed_refresh', []); $this->feed = $this->getMockFeed(); } - public function test() { + /** + * Tests initiating an import. + */ + public function testBeginStage() { $this->plugin->processItem(NULL); - $this->plugin->processItem($this->feed); + $this->plugin->processItem([$this->feed, FeedRefresh::BEGIN, []]); } + /** + * Tests that an import cannot start when the feed is locked. + */ public function testLockException() { $this->feed->expects($this->once()) ->method('lock') ->will($this->throwException(new LockException())); - $this->plugin->processItem($this->feed); + $this->plugin->processItem([$this->feed, FeedRefresh::BEGIN, []]); } /** + * Tests resuming an import. + * + * @todo more testing? + */ + public function testResumeStage() { + $this->plugin->processItem([$this->feed, FeedRefresh::RESUME, []]); + } + + /** + * Tests that a fetch event is dispatched when initiating an import. + * * @expectedException \RuntimeException */ - public function testException() { + public function testExceptionOnFetchEvent() { $this->dispatcher->addListener(FeedsEvents::FETCH, function ($parse_event) { throw new \RuntimeException(); }); - $this->plugin->processItem($this->feed); + $this->plugin->processItem([$this->feed, FeedRefresh::BEGIN, []]); + } + + /** + * Tests the parse stage of an import. + */ + public function testParseStage() { + $this->dispatcher->addListener(FeedsEvents::PARSE, function ($parse_event) { + $parser_result = new ParserResult(); + $parser_result->addItem(new DynamicItem()); + $parse_event->setParserResult($parser_result); + }); + + $fetcher_result = new FetcherResult(''); + + $this->plugin->processItem([ + $this->feed, + FeedRefresh::PARSE, [ + 'fetcher_result' => $fetcher_result, + ], + ]); + } + + /** + * Tests that a parse event is dispatched when a queue task at the parse stage runs. + * + * @expectedException \RuntimeException + */ + public function testExceptionOnParseEvent() { + $this->dispatcher->addListener(FeedsEvents::PARSE, function ($parse_event) { + throw new \RuntimeException(); + }); + + $this->plugin->processItem([ + $this->feed, + FeedRefresh::PARSE, [ + 'fetcher_result' => new FetcherResult(''), + ], + ]); + } + + /** + * Tests the process stage of an import. + */ + public function testProcessStage() { + $this->plugin->processItem([ + $this->feed, + FeedRefresh::PROCESS, [ + 'item' => new DynamicItem(), + ], + ]); + } + + /** + * Tests that a process event is dispatched when a queue task at the process stage runs. + * + * @expectedException \RuntimeException + */ + public function testExceptionOnProcessEvent() { + $this->dispatcher->addListener(FeedsEvents::PROCESS, function ($parse_event) { + throw new \RuntimeException(); + }); + + $this->plugin->processItem([ + $this->feed, + FeedRefresh::PROCESS, [ + 'item' => new DynamicItem(), + ], + ]); + } + + /** + * Tests the final stage of an import. + */ + public function testFinalPass() { + $this->plugin->processItem([ + $this->feed, + FeedRefresh::FINISH, [ + 'fetcher_result' => new FetcherResult(''), + ], + ]); + + $this->feed->expects($this->exactly(2)) + ->method('progressParsing') + ->will($this->returnValue(StateInterface::BATCH_COMPLETE)); + + $this->plugin->processItem([ + $this->feed, + FeedRefresh::FINISH, [ + 'fetcher_result' => new FetcherResult(''), + ], + ]); + $this->feed->expects($this->once()) + ->method('progressFetching') + ->will($this->returnValue(StateInterface::BATCH_COMPLETE)); + $this->plugin->processItem([ + $this->feed, + FeedRefresh::FINISH, [ + 'fetcher_result' => new FetcherResult(''), + ], + ]); } }