? 930652-move_expiry_into_feeds_source.patch Index: feeds.module =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.module,v retrieving revision 1.74.2.11 diff -u -p -r1.74.2.11 feeds.module --- feeds.module 27 Oct 2010 19:53:21 -0000 1.74.2.11 +++ feeds.module 2 Nov 2010 21:20:28 -0000 @@ -29,7 +29,7 @@ define('FEEDS_BATCH_ACTIVE', 0.0); function feeds_cron() { if ($importers = feeds_reschedule()) { foreach ($importers as $id) { - feeds_importer($id)->schedule(); + feeds_source($id)->schedule(); $rows = db_query("SELECT feed_nid FROM {feeds_source} WHERE id = :id", array(':id' => $id)); foreach ($rows as $row) { feeds_source($id, $row->feed_nid)->schedule(); @@ -56,8 +56,8 @@ function feeds_cron_job_scheduler_info() $info['feeds_source_clear'] = array( 'queue name' => 'feeds_source_clear', ); - $info['feeds_importer_expire'] = array( - 'queue name' => 'feeds_importer_expire', + $info['feeds_source_expire'] = array( + 'queue name' => 'feeds_source_expire', ); return $info; } @@ -75,8 +75,8 @@ function feeds_cron_queue_info() { 'worker callback' => 'feeds_source_clear', 'time' => 15, ); - $queues['feeds_importer_expire'] = array( - 'worker callback' => 'feeds_importer_expire', + $queues['feeds_source_expire'] = array( + 'worker callback' => 'feeds_source_expire', 'time' => 15, ); return $queues; @@ -115,16 +115,16 @@ function feeds_source_clear($job) { /** * Scheduler callback for expiring content. */ -function feeds_importer_expire($job) { - $importer = feeds_importer($job['type']); +function feeds_source_expire($job) { + $source = feeds_source($job['type'], $job['id']); try { - $importer->existing()->expire(); + $source->existing()->expire(); } catch (FeedsNotExistingException $e) {} catch (Exception $e) { $importer->log('expire', $e->getMessage(), array(), WATCHDOG_ERROR); } - $importer->scheduleExpire(); + $source->scheduleExpire(); } /** Index: feeds.pages.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.pages.inc,v retrieving revision 1.34.2.7 diff -u -p -r1.34.2.7 feeds.pages.inc --- feeds.pages.inc 27 Oct 2010 00:13:24 -0000 1.34.2.7 +++ feeds.pages.inc 2 Nov 2010 21:20:28 -0000 @@ -103,7 +103,6 @@ function feeds_import_form_submit($form, // Add to schedule, make sure importer is scheduled, too. $source->schedule(); - $source->importer->schedule(); } /** Index: includes/FeedsImporter.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsImporter.inc,v retrieving revision 1.31.2.7 diff -u -p -r1.31.2.7 FeedsImporter.inc --- includes/FeedsImporter.inc 1 Nov 2010 17:01:52 -0000 1.31.2.7 +++ includes/FeedsImporter.inc 2 Nov 2010 21:20:28 -0000 @@ -56,31 +56,6 @@ class FeedsImporter extends FeedsConfigu } /** - * Remove items older than $time. - * - * @param $time - * All items older than REQUEST_TIME - $time will be deleted. If not - * given, internal processor settings will be used. - * - * @return - * FEEDS_BATCH_COMPLETE if the expiry process finished. A decimal between - * 0.0 and 0.9 periodic if expiry is still in progress. - * - * @throws - * Throws Exception if an error occurs when expiring items. - */ - public function expire($time = NULL) { - return $this->processor->expire($time); - } - - /** - * Schedule all periodic tasks for this importer. - */ - public function schedule() { - $this->scheduleExpire(); - } - - /** * Schedule expiry of items. */ public function scheduleExpire() { @@ -91,10 +66,10 @@ class FeedsImporter extends FeedsConfigu ); if (FEEDS_EXPIRE_NEVER != $this->processor->expiryTime()) { $job['period'] = 3600; - JobScheduler::get('feeds_importer_expire')->set($job); + JobScheduler::get('feeds_source_expire')->set($job); } else { - JobScheduler::get('feeds_importer_expire')->remove($job); + JobScheduler::get('feeds_source_expire')->remove($job); } } @@ -165,7 +140,7 @@ class FeedsImporter extends FeedsConfigu feeds_reschedule($this->id); } else { - JobScheduler::get('feeds_importer_expire')->remove($job); + JobScheduler::get('feeds_source_expire')->remove($job); } } Index: includes/FeedsSource.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsSource.inc,v retrieving revision 1.26.2.8 diff -u -p -r1.26.2.8 FeedsSource.inc --- includes/FeedsSource.inc 26 Oct 2010 03:05:58 -0000 1.26.2.8 +++ includes/FeedsSource.inc 2 Nov 2010 21:20:28 -0000 @@ -18,6 +18,7 @@ define('FEEDS_FETCH', 'fetch'); define('FEEDS_PARSE', 'parse'); define('FEEDS_PROCESS', 'process'); define('FEEDS_PROCESS_CLEAR', 'process_clear'); +define('FEEDS_PROCESS_EXPIRY', 'process_expire'); /** * Declares an interface for a class that defines default values and form @@ -266,6 +267,7 @@ class FeedsSource extends FeedsConfigura */ public function schedule() { $this->scheduleImport(); + $this->scheduleExpire(); } /** @@ -295,6 +297,25 @@ class FeedsSource extends FeedsConfigura } /** + * Schedule periodic or background expire tasks. + */ + public function scheduleExpire() { + $period = $this->importer->config['import_period']; + $fetcher_period = $this->importer->fetcher->importPeriod($this); + if (is_numeric($fetcher_period)) { + $period = $fetcher_period; + } + $period = $this->progressExpiry() === FEEDS_BATCH_COMPLETE ? $period : 0; + $job = array( + 'type' => $this->id, + 'id' => $this->feed_nid, + 'period' => $period, + 'periodic' => TRUE, + ); + JobScheduler::get('feeds_source_expire')->set($job); + } + + /** * Schedule background clearing tasks. */ public function scheduleClear() { @@ -398,6 +419,18 @@ class FeedsSource extends FeedsConfigura } /** + * Remove all expired items from a feed. + */ + public function expire() { + $this->acquireLock(); + try { + $this->importer->processor->expire($this); + } + catch (Exception $e) {} + $this->releaseLock(); + } + + /** * Report progress as float between 0 and 1. 1 = FEEDS_BATCH_COMPLETE. */ public function progressParsing() { @@ -432,6 +465,13 @@ class FeedsSource extends FeedsConfigura } /** + * Report progress on expiry. + */ + public function progressExpiry() { + return $this->state(FEEDS_PROCESS_EXPIRY)->progress; + } + + /** * Return a state object for a given stage. Lazy instantiates new states. * * @todo Rename getConfigFor() accordingly to config(). Index: plugins/FeedsNodeProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsNodeProcessor.inc,v retrieving revision 1.69.2.15 diff -u -p -r1.69.2.15 FeedsNodeProcessor.inc --- plugins/FeedsNodeProcessor.inc 28 Oct 2010 18:26:26 -0000 1.69.2.15 +++ plugins/FeedsNodeProcessor.inc 2 Nov 2010 21:20:28 -0000 @@ -82,28 +82,67 @@ class FeedsNodeProcessor extends FeedsPr } /** - * Implement expire(). + * Delete feed items younger than now - $time. Do not invoke expire on a + * processor directly, but use FeedsImporter::expire() instead. * - * @todo: move to processor stage? + * @see FeedsSource::expire(). + * @see FeedsNodeProcessor::expire(). + * + * @param $time + * If implemented, all items produced by this configuration that are older + * than REQUEST_TIME - $time should be deleted. + * If $time === NULL processor should use internal configuration. + * + * @return + * FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0 + * and 0.99* indicating progress otherwise. */ - public function expire($time = NULL) { + public function expire(FeedsSource $source, $time = NULL) { + $state = $source->state(FEEDS_PROCESS_EXPIRY); + if ($time === NULL) { $time = $this->expiryTime(); } if ($time == FEEDS_EXPIRE_NEVER) { return; } - $count = $this->getLimit(); - $nodes = db_query_range("SELECT n.nid FROM {node} n JOIN {feeds_item} fi ON fi.entity_type = 'node' AND n.nid = fi.entity_id WHERE fi.id = :id AND n.created < :created", 0, $count, array(':id' => $this->id, ':created' => REQUEST_TIME - $time)); - $nids = array(); - foreach ($nodes as $node) { - $nids[$node->nid] = $node->nid; - } - $this->entityDeleteMultiple($nids); - if (db_query_range("SELECT 1 FROM {node} n JOIN {feeds_item} fi ON fi.entity_type = 'node' AND n.nid = fi.entity_id WHERE fi.id = :id AND n.created < :created", 0, 1, array(':id' => $this->id, ':created' => REQUEST_TIME - $time))->fetchField()) { - return FEEDS_BATCH_ACTIVE; + + // Build base select statement. + $info = $this->entityInfo(); + $select = db_select($info['base table'], 'e'); + $select->addField('e', $info['entity keys']['id'], 'entity_id'); + $select->join( + 'feeds_item', + 'fi', + "e.{$info['entity keys']['id']} = fi.entity_id AND fi.entity_type = '{$this->entityType()}'"); + $select->condition('fi.id', $this->id); + $select->condition('fi.feed_nid', $source->feed_nid); + $select->condition('e.created', REQUEST_TIME - $time, '<'); + + // If there is no total, query it. + if (!$state->total) { + $state->total = $select->countQuery() + ->execute() + ->fetchField(); + } + + // Delete a batch of entities. + $entities = $select->range(0, $this->getLimit())->execute(); + $entity_ids = array(); + foreach ($entities as $entity) { + $entity_ids[$entity->entity_id] = $entity->entity_id; + } + $this->entityDeleteMultiple($entity_ids); + + // Report progress, take into account that we may not have deleted as + // many items as we have counted at first. + if (count($entity_ids)) { + $state->deleted += count($entity_ids); + $state->progress($state->total, $state->deleted); + } + else { + $state->progress($state->total, $state->total); } - return FEEDS_BATCH_COMPLETE; } /** Index: plugins/FeedsProcessor.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsProcessor.inc,v retrieving revision 1.20.2.10 diff -u -p -r1.20.2.10 FeedsProcessor.inc --- plugins/FeedsProcessor.inc 29 Oct 2010 19:35:18 -0000 1.20.2.10 +++ plugins/FeedsProcessor.inc 2 Nov 2010 21:20:28 -0000 @@ -292,8 +292,8 @@ abstract class FeedsProcessor extends Fe * Delete feed items younger than now - $time. Do not invoke expire on a * processor directly, but use FeedsImporter::expire() instead. * - * @see FeedsImporter::expire(). - * @see FeedsDataProcessor::expire(). + * @see FeedsSource::expire(). + * @see FeedsNodeProcessor::expire(). * * @param $time * If implemented, all items produced by this configuration that are older @@ -304,8 +304,7 @@ abstract class FeedsProcessor extends Fe * FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0 * and 0.99* indicating progress otherwise. */ - public function expire($time = NULL) { - return FEEDS_BATCH_COMPLETE; + public function expire(FeedsSource $source, $time = NULL) { } /** Index: tests/feeds.test.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/tests/feeds.test.inc,v retrieving revision 1.19.2.6 diff -u -p -r1.19.2.6 feeds.test.inc --- tests/feeds.test.inc 27 Oct 2010 22:33:32 -0000 1.19.2.6 +++ tests/feeds.test.inc 2 Nov 2010 21:20:28 -0000 @@ -333,7 +333,7 @@ class FeedsWebTestCase extends DrupalWeb // Check whether feed got properly added to scheduler. $this->assertEqual(1, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = :id AND id = 0 AND name = 'feeds_source_import' AND last <> 0 AND scheduled = 0", array(':id' => $id))->fetchField()); // There must be only one entry for callback 'expire' - no matter what the feed_nid is. - $this->assertEqual(0, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = :id AND name = 'feeds_importer_expire' AND last <> 0 AND scheduled = 0", array(':id' => $id))->fetchField()); + $this->assertEqual(0, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = :id AND name = 'feeds_source_expire' AND last <> 0 AND scheduled = 0", array(':id' => $id))->fetchField()); } /** Index: tests/feeds_scheduler.test =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/tests/feeds_scheduler.test,v retrieving revision 1.6.2.4 diff -u -p -r1.6.2.4 feeds_scheduler.test --- tests/feeds_scheduler.test 6 Oct 2010 14:12:43 -0000 1.6.2.4 +++ tests/feeds_scheduler.test 2 Nov 2010 21:20:28 -0000 @@ -157,14 +157,14 @@ class FeedsSchedulerTestCase extends Fee // Set expire settings, check rescheduling. $max_last = db_query("SELECT MAX(last) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_import' AND period = 0")->fetchField(); $min_last = db_query("SELECT MIN(last) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_import' AND period = 0")->fetchField(); - $this->assertEqual(0, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_importer_expire' AND last <> 0 AND scheduled = 0")->fetchField()); + $this->assertEqual(0, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_expire' AND last <> 0 AND scheduled = 0")->fetchField()); $this->drupalLogin($this->admin_user); $this->setSettings('syndication', 'FeedsNodeProcessor', array('expire' => 86400)); $this->drupalLogout(); sleep(1); $this->cronRun(); - // There should be a feeds_importer_expire job now, and all last fields should be reset. - $this->assertEqual(1, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_importer_expire' AND last <> 0 AND scheduled = 0 AND period = 3600")->fetchField()); + // There should be a feeds_source_expire job now, and all last fields should be reset. + $this->assertEqual(1, db_query("SELECT COUNT(*) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_expire' AND last <> 0 AND scheduled = 0 AND period = 3600")->fetchField()); $new_max_last = db_query("SELECT MAX(last) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_import' AND period = 0")->fetchField(); $new_min_last = db_query("SELECT MIN(last) FROM {job_schedule} WHERE type = 'syndication' AND name = 'feeds_source_import' AND period = 0")->fetchField(); $this->assertNotEqual($new_max_last, $max_last);