? .git ? .gitignore ? 617054-16_push.patch ? 617054-17_push.patch ? libraries/simplepie.inc Index: feeds.install =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.install,v retrieving revision 1.6 diff -u -p -r1.6 feeds.install --- feeds.install 10 Feb 2010 23:49:35 -0000 1.6 +++ feeds.install 19 Feb 2010 23:54:54 -0000 @@ -187,6 +187,66 @@ function feeds_schema() { 'guid' => array(array('guid', 255)), ), ); + $schema['feeds_push_subscriptions'] = array( + 'description' => 'PubsubHubbub subscriptions.', + 'fields' => array( + 'domain' => array( + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Domain of the subscriber. Corresponds to an importer id.', + ), + 'subscriber_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'unsigned' => TRUE, + 'description' => 'ID of the subscriber. Corresponds to a feed nid.', + ), + 'timestamp' => array( + 'type' => 'int', + 'unsigned' => FALSE, + 'default' => 0, + 'not null' => TRUE, + 'description' => 'Created timestamp.', + ), + 'hub' => array( + 'type' => 'text', + 'not null' => TRUE, + 'description' => t('The URL of the hub endpoint of this subscription.'), + ), + 'topic' => array( + 'type' => 'text', + 'not null' => TRUE, + 'description' => t('The topic URL (feed URL) of this subscription.'), + ), + 'secret' => array( + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Shared secret for message authentication.', + ), + 'status' => array( + 'type' => 'varchar', + 'length' => 64, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Status of subscription.', + ), + 'post_fields' => array( + 'type' => 'text', + 'not null' => FALSE, + 'description' => 'Fields posted.', + 'serialize' => TRUE, + ), + ), + 'primary key' => array('domain', 'subscriber_id'), + 'indexes' => array( + 'timestamp' => array('timestamp'), + ), + ); return $schema; } @@ -362,4 +422,73 @@ function feeds_update_6008() { db_change_field($ret, 'feeds_schedule', 'last_scheduled_time', 'last_executed_time', $spec); return $ret; -} \ No newline at end of file +} + +/** + * Add feeds_push_subscriptions tables. + */ +function feeds_update_6009() { + $ret = array(); + $table = array( + 'description' => 'PubsubHubbub subscriptions.', + 'fields' => array( + 'domain' => array( + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Domain of the subscriber. Corresponds to an importer id.', + ), + 'subscriber_id' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'unsigned' => TRUE, + 'description' => 'ID of the subscriber. Corresponds to a feed nid.', + ), + 'timestamp' => array( + 'type' => 'int', + 'unsigned' => FALSE, + 'default' => 0, + 'not null' => TRUE, + 'description' => 'Created timestamp.', + ), + 'hub' => array( + 'type' => 'text', + 'not null' => TRUE, + 'description' => t('The URL of the hub endpoint of this subscription.'), + ), + 'topic' => array( + 'type' => 'text', + 'not null' => TRUE, + 'description' => t('The topic URL (feed URL) of this subscription.'), + ), + 'secret' => array( + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Shared secret for message authentication.', + ), + 'status' => array( + 'type' => 'varchar', + 'length' => 64, + 'not null' => TRUE, + 'default' => '', + 'description' => 'Status of subscription.', + ), + 'post_fields' => array( + 'type' => 'text', + 'not null' => FALSE, + 'description' => 'Fields posted.', + 'serialize' => TRUE, + ), + ), + 'primary key' => array('domain', 'subscriber_id'), + 'indexes' => array( + 'timestamp' => array('timestamp'), + ), + ); + db_create_table($ret, 'feeds_push_subscriptions', $table); + return $ret; +} Index: feeds.module =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.module,v retrieving revision 1.29 diff -u -p -r1.29 feeds.module --- feeds.module 10 Feb 2010 23:49:35 -0000 1.29 +++ feeds.module 19 Feb 2010 23:54:54 -0000 @@ -126,6 +126,7 @@ function feeds_menu() { 'weight' => 11, ); } + $items += $importer->fetcher->menuItem(); } if (count($items)) { $items['import'] = array( @@ -285,8 +286,9 @@ function feeds_nodeapi(&$node, $op, $for if ($op == 'insert' && feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) { feeds_batch_set(t('Importing'), 'import', $importer_id, $node->nid); } - // Add import to scheduler. + // Add import and subscribe to scheduler. feeds_scheduler()->add($importer_id, 'import', $node->nid); + feeds_scheduler()->add($importer_id, 'subscribe', $node->nid); // Add expiry to schedule, in case this is the first feed of this // configuration. feeds_scheduler()->add($importer_id, 'expire'); @@ -294,6 +296,7 @@ function feeds_nodeapi(&$node, $op, $for case 'delete': // Remove feed from scheduler and delete source. feeds_scheduler()->remove($importer_id, 'import', $node->nid); + feeds_scheduler()->remove($importer_id, 'subscribe', $node->nid); feeds_source($importer_id, $node->nid)->delete(); break; } @@ -529,6 +532,23 @@ function feeds_export($importer_id, $ind } /** + * Log to a file like /mytmp/feeds_my_domain_org.log in temporary directory. + * + * @todo Document feeds_debug variable + */ +function feeds_dbg($msg) { + if (variable_get('feeds_debug', false)) { + if (!is_string($msg)) { + $msg = var_export($msg, true); + } + $filename = trim(str_replace('/', '_', $_SERVER['HTTP_HOST'] . base_path()), '_'); + $handle = fopen(file_directory_temp() ."/feeds_$filename.log", 'a'); + fwrite($handle, date('c') ."\t$msg\n"); + fclose($handle); + } +} + +/** * @} End of "defgroup utility". */ Index: feeds.pages.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.pages.inc,v retrieving revision 1.9 diff -u -p -r1.9 feeds.pages.inc --- feeds.pages.inc 10 Feb 2010 23:49:35 -0000 1.9 +++ feeds.pages.inc 19 Feb 2010 23:54:54 -0000 @@ -88,6 +88,7 @@ function feeds_import_form_submit($form, // Add importer to schedule. feeds_scheduler()->add($form['#importer_id'], 'import'); + feeds_scheduler()->add($form['#importer_id'], 'subscribe'); feeds_scheduler()->add($form['#importer_id'], 'expire'); } @@ -139,3 +140,13 @@ function feeds_delete_tab_form_submit($f $form_state['redirect'] = $form['#redirect']; feeds_batch_set(t('Deleting'), 'clear', $form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid']); } + +/** + * Handle a fetcher callback. + */ +function feeds_fetcher_callback($importer, $feed_nid = 0) { + if ($importer instanceof FeedsImporter) { + return $importer->fetcher->request($feed_nid); + } + drupal_access_denied(); +} Index: feeds.plugins.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.plugins.inc,v retrieving revision 1.4 diff -u -p -r1.4 feeds.plugins.inc --- feeds.plugins.inc 25 Jan 2010 20:03:05 -0000 1.4 +++ feeds.plugins.inc 19 Feb 2010 23:54:55 -0000 @@ -76,6 +76,17 @@ function _feeds_feeds_plugins() { 'path' => $path, ), ); + $info['FeedsPubSubFetcher'] = array( + 'name' => 'PubSubHubbub Fetcher', + 'description' => 'Download content from a URL, receive change notifications.', + 'help' => 'Download content from a URL. If feed at URL supports PubSubHubbub, subscribe to hub for notifications of changes.', + 'handler' => array( + 'parent' => 'FeedsHTTPFetcher', + 'class' => 'FeedsPubSubFetcher', + 'file' => 'FeedsPubSubFetcher.inc', + 'path' => $path, + ), + ); $info['FeedsCSVParser'] = array( 'name' => 'CSV parser', 'description' => 'Parse data in Comma Separated Value format.', Index: includes/FeedsImporter.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsImporter.inc,v retrieving revision 1.12 diff -u -p -r1.12 FeedsImporter.inc --- includes/FeedsImporter.inc 11 Feb 2010 00:26:49 -0000 1.12 +++ includes/FeedsImporter.inc 19 Feb 2010 23:54:55 -0000 @@ -104,10 +104,13 @@ class FeedsImporter extends FeedsConfigu switch ($job['callback']) { case 'import': return feeds_source($job['id'], $job['feed_nid'])->import(); - break; case 'expire': return $this->expire(); + case 'subscribe': + feeds_source($job['importer_id'], $job['feed_nid'])->fetcher->subscribe(); + break; } + return FEEDS_BATCH_COMPLETE; } /** @@ -123,6 +126,8 @@ class FeedsImporter extends FeedsConfigu return 3600; } return FEEDS_SCHEDULE_NEVER; + case 'subscribe': + return $this->fetcher->subscriptionPeriod(); } } @@ -130,7 +135,7 @@ class FeedsImporter extends FeedsConfigu * Expose available schedule callbacks. */ public function getScheduleCallbacks() { - return array('import', 'expire'); + return array('import', 'expire', 'subscribe'); } /** Index: libraries/PuSHSubscriber.inc =================================================================== RCS file: libraries/PuSHSubscriber.inc diff -N libraries/PuSHSubscriber.inc --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ libraries/PuSHSubscriber.inc 19 Feb 2010 23:54:55 -0000 @@ -0,0 +1,346 @@ +domain = $domain; + $this->subscriber_id = $subscriber_id; + $this->subscription_class = $subscription_class; + $this->env = $env; + } + + /** + * Subscribe to a given URL. Attempt to retrieve 'hub' and 'self' links from + * document at $url and issue a subscription request to the hub. + * + * @param $url + * The URL of the feed to subscribe to. + * @param $callback_url + * The full URL that hub should invoke for subscription verification or for + * notifications. + * @param $hub + * The URL of a hub. If given overrides the hub URL found in the document + * at $url. + */ + public function subscribe($url, $callback_url, $hub = '') { + // Fetch document, find rel=hub and rel=self. + // If present, issue subscription request. + $request = curl_init($url); + curl_setopt($request, CURLOPT_FOLLOWLOCATION, TRUE); + curl_setopt($request, CURLOPT_RETURNTRANSFER, TRUE); + $data = curl_exec($request); + if (curl_getinfo($request, CURLINFO_HTTP_CODE) == 200) { + $xml = new SimpleXMLElement($data); + $xml->registerXPathNamespace('atom', 'http://www.w3.org/2005/Atom'); + if (empty($hub) && $hub = @current($xml->xpath("//atom:link[attribute::rel='hub']"))) { + $hub = (string) $hub->attributes()->href; + } + if ($self = @current($xml->xpath("//atom:link[attribute::rel='self']"))) { + $self = (string) $self->attributes()->href; + } + } + curl_close($request); + // Fall back to $url if $self is not given. + if (!isset($self)) { + $self = $url; + } + if (!empty($hub) && !empty($self)) { + $this->request($hub, $self, 'subscribe', $callback_url); + } + } + + /** + * @todo Unsubscribe from a hub. + * @todo Make sure we unsubscribe with the correct topic URL as it can differ + * from the initial subscription URL. + * + * @param $topic_url + * The URL of the topic to unsubscribe from. + * @param $callback_url + * The callback to unsubscribe. + */ + public function unsubscribe($topic_url, $callback_url) { + if ($sub = $this->loadSubscription()) { + $this->request($sub->hub, $sub->topic, 'unsubscribe', $callback_url); + $sub->delete(); + } + } + + /** + * Issue a subscribe or unsubcribe request to a PubsubHubbub hub. + * + * @param $hub + * The URL of the hub's subscription endpoint. + * @param $topic + * The topic URL of the feed to subscribe to. + * @param $mode + * 'subscribe' or 'unsubscribe'. + * @param $callback_url + * The subscriber's notifications callback URL. + * + * Compare to http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.2.html#anchor5 + * + * @todo Make concurrency safe. + */ + protected function request($hub, $topic, $mode, $callback_url) { + $secret = hash('sha1', uniqid(rand(), true)); + $post_fields = array( + 'hub.callback' => $callback_url, + 'hub.mode' => $mode, + 'hub.topic' => $topic, + 'hub.verify' => 'sync', + 'hub.lease_seconds' => '', // Permanent subscription. + 'hub.secret' => $secret, + 'hub.verify_token' => md5(session_id() . rand()), + ); + $sub = new $this->subscription_class($this->domain, $this->subscriber_id, $hub, $topic, $secret, $mode, $post_fields); + $sub->save(); + // Issue subscription request. + $request = curl_init($hub); + curl_setopt($request, CURLOPT_POST, TRUE); + curl_setopt($request, CURLOPT_POSTFIELDS, $post_fields); + curl_setopt($request, CURLOPT_RETURNTRANSFER, TRUE); + curl_exec($request); + $code = curl_getinfo($request, CURLINFO_HTTP_CODE); + if (!in_array($code, array(202, 204))) { + $sub->status = $mode .' failed'; + $sub->save(); + $this->msg("Error subscribing to hub ($code)", 'error'); + $this->log("Error subscribing to hub ($code)", 'error'); + } + curl_close($request); + } + + /** + * Verify a request. After a hub has received a subscribe or unsubscribe + * request (see PuSHSubscriber::request()) it sends back a challenge verifying + * that an action indeed was requested ($_GET['hub_challenge']). This + * method handles the challenge. + */ + public function verifyRequest() { + if (isset($_GET['hub_challenge'])) { + /** + * If a subscription is present, compare the verify token. If the token + * matches, set the status on the subscription record and confirm + * positive. + * + * If we cannot find a matching subscription and the hub checks on + * 'unsubscribe' confirm positive. + * + * In all other cases confirm negative. + */ + if ($sub = $this->loadSubscription()) { + if ($_GET['hub_verify_token'] == $sub->post_fields['hub.verify_token']) { + if ($_GET['hub_mode'] == 'subscribe' && $sub->status == 'subscribe') { + $sub->status = 'subscribed'; + $sub->post_fields = array(); + $sub->save(); + $this->log('Verified subscribe request.'); + $verify = TRUE; + } + elseif ($_GET['hub_mode'] == 'unsubscribe' && $sub->status == 'unsubscribe') { + $sub->status = 'unsubscribed'; + $sub->post_fields = array(); + $sub->save(); + $this->log('Verified unsubscribe request.'); + $verify = TRUE; + } + } + } + elseif ($_GET['hub_mode'] == 'unsubscribe') { + $this->log('Verified unsubscribe request.'); + $verify = TRUE; + } + if ($verify) { + header('HTTP/1.1 200 "Found"', null, 200); + print $_GET['hub_challenge']; + exit(); + } + } + header('HTTP/1.1 404 "Not Found"', null, 404); + $this->log('Could not verify subscription.', 'error'); + exit(); + } + + /** + * Receive a notification. + * + * @param $ignore_signature + * If FALSE, only accept payload if there is a signature present and the + * signature matches the payload. Warning: setting to TRUE results in + * unsafe behavior. + * + * @return + * An XML string that is the payload of the notification if valid, FALSE + * otherwise. + */ + public function receive($ignore_signature = FALSE) { + /** + * Verification steps: + * + * 1) Verify that this is indeed a POST reuest. + * 2) Verify that posted string is XML. + * 3) Per default verify sender of message by checking the message's + * signature against the shared secret. + */ + if ($_SERVER['REQUEST_METHOD'] == 'POST') { + $raw = file_get_contents('php://input'); + if (@simplexml_load_string($raw)) { + if ($ignore_signature) { + return $raw; + } + if (isset($_SERVER['HTTP_X_HUB_SIGNATURE']) && ($sub = $this->loadSubscription())) { + $result = array(); + parse_str($_SERVER['HTTP_X_HUB_SIGNATURE'], $result); + if (isset($result['sha1']) && $result['sha1'] == hash_hmac('sha1', $raw, $sub->secret)) { + return $raw; + } + else { + $this->log('Could not verify signature.', 'error'); + } + } + else { + $this->log('No signature present.', 'error'); + } + } + } + return FALSE; + } + + /** + * Helper for loading a subscription. + */ + protected function loadSubscription() { + return call_user_func("{$this->subscription_class}::load", $this->domain, $this->subscriber_id); + } + + /** + * Helper for messaging. + */ + protected function msg($msg, $level) { + $this->env->msg($msg, $level); + } + + /** + * Helper for logging. + */ + protected function log($msg, $level) { + $this->env->log("{$this->domain}:{$this->subscriber_id}\t$msg", $level); + } +} + +/** + * Implement to provide a storage backend for subscriptions. + * + * Variables passed in to the constructor must be accessible as public class + * variables. + */ +interface PuSHSubscriptionInterface { + /** + * @param $domain + * A string that defines the domain in which the subscriber_id is unique. + * @param $subscriber_id + * A unique numeric subscriber id. + * @param $hub + * The URL of the hub endpoint. + * @param $topic + * The topic to subscribe to. + * @param $secret + * A secret key used for message authentication. + * @param $status + * The status of the subscription. + * 'subscribe' - subscribing to a feed. + * 'unsubscribe' - unsubscribing from a feed. + * 'subscribed' - subscribed. + * 'unsubscribed' - unsubscribed. + * 'subscribe failed' - subscribe request failed. + * 'unsubscribe failed' - unsubscribe request failed. + * @param $post_fields + * An array of the fields posted to the hub. + */ + public function __construct($domain, $subscriber_id, $hub, $topic, $secret, $status = '', $post_fields = ''); + + /** + * Save a subscription. + */ + public function save(); + + /** + * Load a subscription. + * + * @return + * A PuSHSubscriptionInterface object if a subscription exist, NULL + * otherwise. + */ + public static function load($domain, $subscriber_id); + + /** + * Delete a subscription. + */ + public function delete(); +} + +/** + * Implement to provide environmental functionality like user messages and + * logging. + */ +interface PuSHSubscriberEnvironmentInterface { + /** + * A message to be displayed to the user on the current page load. + * + * @param $msg + * A string that is the message to be displayed. + * @param $level + * A string that is either 'status', 'warning' or 'error'. + */ + public function msg($msg, $level = 'status'); + + /** + * A log message to be logged to the database or the file system. + * + * @param $msg + * A string that is the message to be displayed. + * @param $level + * A string that is either 'status', 'warning' or 'error'. + */ + public function log($msg, $level = 'status'); +} \ No newline at end of file Index: plugins/FeedsFetcher.inc =================================================================== RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsFetcher.inc,v retrieving revision 1.4 diff -u -p -r1.4 FeedsFetcher.inc --- plugins/FeedsFetcher.inc 20 Dec 2009 23:54:44 -0000 1.4 +++ plugins/FeedsFetcher.inc 19 Feb 2010 23:54:55 -0000 @@ -26,4 +26,78 @@ abstract class FeedsFetcher extends Feed * caches pertaining to this source. */ public function clear(FeedsSource $source) {} + + /** + * Request handler invoked if callback URL is requested. Locked down by + * default. For an example use see FeedsPubSubFetcher. + * + * Method may exit the script. + * + * @return + * A string to be returned to the client. + */ + public function request($feed_nid = 0) { + drupal_access_denied(); + } + + /** + * Construct a path for a concrete fetcher/source combination. The result of + * this method matches up with the general path definition in + * FeedsFetcher::menuItem(). For example usage look at FeedsPubSubFetcher. + * + * @return + * Path for this fetcher/source combination. + */ + public function path($feed_nid = 0) { + if ($feed_nid) { + return urlencode('feeds/fetcher/'. $this->id .'/'. $feed_nid); + } + return urlencode('feeds/fetcher/'. $this->id); + } + + /** + * Menu item definition for fetchers of this class. Note how the path + * component in the item definition matches the return value of + * FeedsFetcher::path(); + * + * Requests to this menu item will be routed to FeedsFetcher::request(). + * + * @return + * An array where the key is the Drupal menu item path and the value is + * a valid Drupal menu item definition. + */ + public function menuItem() { + return array( + 'feeds/fetcher/%feeds_importer' => array( + 'page callback' => 'feeds_fetcher_callback', + 'page arguments' => array(2, 3), + 'access callback' => TRUE, + 'file' => 'feeds.pages.inc', + 'type' => MENU_CALLBACK, + ), + ); + } + + /** + * Subscribe to a source. Only implement if fetcher requires subscription. + * + * @param FeedsSource $source + * Source information for this subscription. + */ + public function subscribe(FeedsSource $source) {} + + /** + * Unsubscribe from a source. Only implement if fetcher requires subscription. + * + * @param FeedsSource $source + * Source information for unsubscribing. + */ + public function unsubscribe(FeedsSource $source) {} + + /** + * Indicate the time within which a subscription needs to be renewed. + */ + public function subscriptionPeriod() { + return FEEDS_SCHEDULE_NEVER; + } } Index: plugins/FeedsPubSubFetcher.inc =================================================================== RCS file: plugins/FeedsPubSubFetcher.inc diff -N plugins/FeedsPubSubFetcher.inc --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ plugins/FeedsPubSubFetcher.inc 19 Feb 2010 23:54:55 -0000 @@ -0,0 +1,269 @@ +raw = $raw; + parent::__construct(); + } + + /** + * Override FeedsHTTPBatch::getRaw() and simply return the raw content + * populated on instnatiation. + */ + function getRaw() { + return $this->raw; + } +} + +/** + * Publish/Subscribe fetcher. Supports at the moment only PubSubHubbub (PuSH). + */ +class FeedsPubSubFetcher extends FeedsHTTPFetcher { + + /** + * Implementation of FeedsFetcher::fetch(). + */ + public function fetch(FeedsSource $source) { + $source_config = $source->getConfigFor($this); + // Handle fat ping if present, otherwise pass up to HTTP fetcher. + if ($raw = feeds_push_subscriber($this->id, $source->feed_nid)->receive($this->config['ignore_signature'])) { + return new FeedsPubSubBatch($raw); + } + return parent::fetch($source); + } + + /** + * Implementation of FeedsFetcher::request(). + */ + public function request($feed_nid = 0) { + feeds_dbg($_GET); + @feeds_dbg(file_get_contents('php://input')); + // A subscription verification has been sent, verify. + if (isset($_GET['hub_challenge'])) { + $this->pushVerifyRequest($feed_nid); + } + // No subscription notification has ben sent, we are being notified. + else { + try { + feeds_source($this->id, $feed_nid)->existing()->import(); + } + catch (Exception $e) { + // In case of an error, respond with a 503 Service (temporary) unavailable. + header('HTTP/1.1 503 "Not Found"', null, 503); + exit(); + } + } + // Will generate the default 200 response. + return ''; + } + + /** + * Override sourceSave() - subscribe to hub. + */ + public function sourceSave(FeedsSource $source) { + $this->subscribe($source); + } + + /** + * Override sourceDelete() - unsubscribe from hub. + */ + public function sourceDelete(FeedsSource $source) { + $this->unsubscribe($source); + } + + /** + * Implement FeedsFetcher::subscribe() - subscribe to hub. + */ + public function subscribe(FeedsSource $source) { + $source_config = $source->getConfigFor($this); + feeds_push_subscriber($this->id, $source->feed_nid)->subscribe($source_config['source'], url($this->path($source->feed_nid), array('absolute' => TRUE)), valid_url($this->config['designated_hub']) ? $this->config['designated_hub'] : ''); + } + + /** + * Implement FeedsFetcher::unsubscribe() - unsubscribe from hub. + */ + public function unsubscribe(FeedsSource $source) { + $source_config = $source->getConfigFor($this); + feeds_push_subscriber($this->id, $source->feed_nid)->unsubscribe($source_config['source'], url($this->path($source->feed_nid), array('absolute' => TRUE))); + } + + /** + * Implement FeedsFetcher::subscriptionPeriod(). + * Indicate how often a subscription needs to be renewed. + * + * @todo subscription_period should actually be retrieved from the hub's + * response to a subscription. This will mean a different subscription + * period per source, hence a major change to FeedsScheduler which is + * currently assuming a fixed period per task! + */ + public function subscriptionPeriod() { + return $this->config['subscription_period']; + } + + /** + * Verify a PubSubHubbub subscription request. + */ + public function pushVerifyRequest($feed_nid) { + feeds_push_subscriber($this->id, $feed_nid)->verifyRequest(); + } + + /** + * Return defaults for configuration. + */ + public function configDefaults() { + $defaults = parent::configDefaults(); + return $defaults + array( + 'subscription_period' => 3600*24, // Renew subscription in 24 hours. + 'designated_hub' => '', + 'ignore_signature' => FALSE, + ); + } + + /** + * Override parent::configForm(). + */ + public function configForm(&$form_state) { + $form = parent::configForm($form_state); + $period = drupal_map_assoc(array(0, 900, 1800, 3600, 10800, 21600, 43200, 86400, 259200, 604800, 2419200), 'format_interval'); + $period[FEEDS_SCHEDULE_NEVER] = t('Never renew'); + $period[0] = t('Renew as often as possible'); + $form['subscription_period'] = array( + '#type' => 'select', + '#title' => t('Renew subscription after'), + '#options' => $period, + '#description' => t('This is the minimum time that must elapse before a subscription is renewed.'), + '#default_value' => $this->config['subscription_period'], + ); + $form['designated_hub'] = array( + '#type' => 'textfield', + '#title' => t('Designated hub'), + '#description' => t('Enter the callback URL of a designated hub. If given, this hub will be used instead of the hub specified in the feed source.'), + '#default_value' => $this->config['designated_hub'], + ); + $form['ignore_signature'] = array( + '#type' => 'checkbox', + '#title' => t('Ignore signatures'), + '#description' => t('Check to ignore signatures on notifications. Warning: Ignoring signatures results in unsafe behavior. Potential attackers can post any content without authentication.'), + '#default_value' => $this->config['ignore_signature'], + ); + return $form; + } +} + +/** + * Create a PubSubHubbub subscriber. + * + * @return PushSubscriber + * A PushSubscriber object. + */ +function feeds_push_subscriber($id, $subscriber_id) { + return PushSubscriber::instance($id, $subscriber_id, 'PuSHSubscription', PuSHEnvironment::instance()); +} + +/** + * Implement a PuSHSubscriptionInterface. + */ +class PuSHSubscription implements PuSHSubscriptionInterface { + public $domain; + public $subscriber_id; + public $hub; + public $topic; + public $status; + public $secret; + public $post_fields; + public $timestamp; + + /** + * Load a subscription. + */ + public static function load($domain, $subscriber_id) { + if ($v = db_fetch_array(db_query("SELECT * FROM {feeds_push_subscriptions} WHERE domain = '%s' AND subscriber_id = %d", $domain, $subscriber_id))) { + $v['post_fields'] = unserialize($v['post_fields']); + return new PuSHSubscription($v['domain'], $v['subscriber_id'], $v['hub'], $v['topic'], $v['secret'], $v['status'], $v['post_fields'], $v['timestamp']); + } + } + + /** + * Create a subscription. + */ + public function __construct($domain, $subscriber_id, $hub, $topic, $secret, $status = '', $post_fields = '') { + $this->domain = $domain; + $this->subscriber_id = $subscriber_id; + $this->hub = $hub; + $this->topic = $topic; + $this->status = $status; + $this->secret = $secret; + $this->post_fields = $post_fields; + } + + /** + * Save a subscription. + */ + public function save() { + $this->timestamp = time(); + $this->delete($this->domain, $this->subscriber_id); + drupal_write_record('feeds_push_subscriptions', $this); + } + + /** + * Delete a subscription. + */ + public function delete() { + db_query("DELETE FROM {feeds_push_subscriptions} WHERE domain = '%s' AND subscriber_id = %d", $this->domain, $this->subscriber_id); + } +} + +/** + * Provide environmental functions to the PuSHSubscriber library. + */ +class PuSHEnvironment implements PuSHSubscriberEnvironmentInterface { + /** + * Singleton. + */ + public static function instance() { + static $env; + if (empty($env)) { + $env = new PuSHEnvironment(); + } + return $env; + } + + /** + * Implementation of PuSHSubscriberEnvironmentInterface::msg(). + */ + public function msg($msg, $level = 'status') { + drupal_set_message($msg, $level); + } + + /** + * Implementation of PuSHSubscriberEnvironmentInterface::log(). + */ + public function log($msg, $level = 'status') { + switch ($level) { + case 'error': + $severity = WATCHDOG_ERROR; + break; + case 'warning': + $severity = WATCHDOG_WARNING; + break; + default: + $severity = WATCHDOG_NOTICE; + break; + } + feeds_dbg($msg); + watchdog('FeedsPubSubFetcher', $msg, array(), $severity); + } +}