? .git
? .gitignore
? 617054-16_push.patch
? 617054-17_push.patch
? 617054-18_push.patch
? libraries/simplepie.inc
Index: feeds.install
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.install,v
retrieving revision 1.6
diff -u -p -r1.6 feeds.install
--- feeds.install	10 Feb 2010 23:49:35 -0000	1.6
+++ feeds.install	20 Feb 2010 00:56:17 -0000
@@ -187,6 +187,66 @@ function feeds_schema() {
       'guid' => array(array('guid', 255)),
     ),
   );
+  $schema['feeds_push_subscriptions'] = array(
+    'description' => 'PubsubHubbub subscriptions.',
+    'fields' => array(
+      'domain' => array(
+        'type' => 'varchar',
+        'length' => 128,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Domain of the subscriber. Corresponds to an importer id.',
+      ),
+      'subscriber_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'unsigned' => TRUE,
+        'description' => 'ID of the subscriber. Corresponds to a feed nid.',
+      ),
+      'timestamp' => array(
+        'type' => 'int',
+        'unsigned' => FALSE,
+        'default' => 0,
+        'not null' => TRUE,
+        'description' => 'Created timestamp.',
+      ),
+      'hub' => array(
+        'type' => 'text',
+        'not null' => TRUE,
+        'description' => t('The URL of the hub endpoint of this subscription.'),
+      ),
+      'topic' => array(
+        'type' => 'text',
+        'not null' => TRUE,
+        'description' => t('The topic URL (feed URL) of this subscription.'),
+      ),
+      'secret' => array(
+        'type' => 'varchar',
+        'length' => 128,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Shared secret for message authentication.',
+      ),
+      'status' => array(
+        'type' => 'varchar',
+        'length' => 64,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Status of subscription.',
+      ),
+      'post_fields' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'description' => 'Fields posted.',
+        'serialize' => TRUE,
+      ),
+    ),
+    'primary key' => array('domain', 'subscriber_id'),
+    'indexes' => array(
+      'timestamp' => array('timestamp'),
+    ),
+  );
   return $schema;
 }
 
@@ -362,4 +422,73 @@ function feeds_update_6008() {
   db_change_field($ret, 'feeds_schedule', 'last_scheduled_time', 'last_executed_time', $spec);
 
   return $ret;
-}
\ No newline at end of file
+}
+
+/**
+ * Add feeds_push_subscriptions tables.
+ */
+function feeds_update_6009() {
+  $ret = array();
+  $table = array(
+    'description' => 'PubsubHubbub subscriptions.',
+    'fields' => array(
+      'domain' => array(
+        'type' => 'varchar',
+        'length' => 128,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Domain of the subscriber. Corresponds to an importer id.',
+      ),
+      'subscriber_id' => array(
+        'type' => 'int',
+        'not null' => TRUE,
+        'default' => 0,
+        'unsigned' => TRUE,
+        'description' => 'ID of the subscriber. Corresponds to a feed nid.',
+      ),
+      'timestamp' => array(
+        'type' => 'int',
+        'unsigned' => FALSE,
+        'default' => 0,
+        'not null' => TRUE,
+        'description' => 'Created timestamp.',
+      ),
+      'hub' => array(
+        'type' => 'text',
+        'not null' => TRUE,
+        'description' => t('The URL of the hub endpoint of this subscription.'),
+      ),
+      'topic' => array(
+        'type' => 'text',
+        'not null' => TRUE,
+        'description' => t('The topic URL (feed URL) of this subscription.'),
+      ),
+      'secret' => array(
+        'type' => 'varchar',
+        'length' => 128,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Shared secret for message authentication.',
+      ),
+      'status' => array(
+        'type' => 'varchar',
+        'length' => 64,
+        'not null' => TRUE,
+        'default' => '',
+        'description' => 'Status of subscription.',
+      ),
+      'post_fields' => array(
+        'type' => 'text',
+        'not null' => FALSE,
+        'description' => 'Fields posted.',
+        'serialize' => TRUE,
+      ),
+    ),
+    'primary key' => array('domain', 'subscriber_id'),
+    'indexes' => array(
+      'timestamp' => array('timestamp'),
+    ),
+  );
+  db_create_table($ret, 'feeds_push_subscriptions', $table);
+  return $ret;
+}
Index: feeds.module
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.module,v
retrieving revision 1.29
diff -u -p -r1.29 feeds.module
--- feeds.module	10 Feb 2010 23:49:35 -0000	1.29
+++ feeds.module	20 Feb 2010 00:56:17 -0000
@@ -126,6 +126,7 @@ function feeds_menu() {
         'weight' => 11,
       );
     }
+    $items += $importer->fetcher->menuItem();
   }
   if (count($items)) {
     $items['import'] = array(
@@ -285,8 +286,9 @@ function feeds_nodeapi(&$node, $op, $for
         if ($op == 'insert' && feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) {
           feeds_batch_set(t('Importing'), 'import', $importer_id, $node->nid);
         }
-        // Add import to scheduler.
+        // Add import and subscribe to scheduler.
         feeds_scheduler()->add($importer_id, 'import', $node->nid);
+        feeds_scheduler()->add($importer_id, 'subscribe', $node->nid);
         // Add expiry to schedule, in case this is the first feed of this
         // configuration.
         feeds_scheduler()->add($importer_id, 'expire');
@@ -294,6 +296,7 @@ function feeds_nodeapi(&$node, $op, $for
       case 'delete':
         // Remove feed from scheduler and delete source.
         feeds_scheduler()->remove($importer_id, 'import', $node->nid);
+        feeds_scheduler()->remove($importer_id, 'subscribe', $node->nid);
         feeds_source($importer_id, $node->nid)->delete();
         break;
     }
@@ -529,6 +532,23 @@ function feeds_export($importer_id, $ind
 }
 
 /**
+ * Log to a file like /mytmp/feeds_my_domain_org.log in temporary directory.
+ *
+ * @todo Document feeds_debug variable
+ */
+function feeds_dbg($msg) {
+  if (variable_get('feeds_debug', false)) {
+    if (!is_string($msg)) {
+      $msg = var_export($msg, true);
+    }
+    $filename = trim(str_replace('/', '_', $_SERVER['HTTP_HOST'] . base_path()), '_');
+    $handle = fopen(file_directory_temp() ."/feeds_$filename.log", 'a');
+    fwrite($handle, date('c') ."\t$msg\n");
+    fclose($handle);
+  }
+}
+
+/**
  * @} End of "defgroup utility".
  */
 
Index: feeds.pages.inc
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.pages.inc,v
retrieving revision 1.9
diff -u -p -r1.9 feeds.pages.inc
--- feeds.pages.inc	10 Feb 2010 23:49:35 -0000	1.9
+++ feeds.pages.inc	20 Feb 2010 00:56:17 -0000
@@ -88,6 +88,7 @@ function feeds_import_form_submit($form,
 
   // Add importer to schedule.
   feeds_scheduler()->add($form['#importer_id'], 'import');
+  feeds_scheduler()->add($form['#importer_id'], 'subscribe');
   feeds_scheduler()->add($form['#importer_id'], 'expire');
 }
 
@@ -139,3 +140,13 @@ function feeds_delete_tab_form_submit($f
   $form_state['redirect'] = $form['#redirect'];
   feeds_batch_set(t('Deleting'), 'clear', $form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid']);
 }
+
+/**
+ * Handle a fetcher callback.
+ */
+function feeds_fetcher_callback($importer, $feed_nid = 0) {
+  if ($importer instanceof FeedsImporter) {
+    return $importer->fetcher->request($feed_nid);
+  }
+  drupal_access_denied();
+}
Index: feeds.plugins.inc
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/feeds.plugins.inc,v
retrieving revision 1.4
diff -u -p -r1.4 feeds.plugins.inc
--- feeds.plugins.inc	25 Jan 2010 20:03:05 -0000	1.4
+++ feeds.plugins.inc	20 Feb 2010 00:56:17 -0000
@@ -76,6 +76,17 @@ function _feeds_feeds_plugins() {
       'path' => $path,
     ),
   );
+  $info['FeedsPubSubFetcher'] = array(
+    'name' => 'PubSubHubbub Fetcher',
+    'description' => 'Download content from a URL, receive change notifications.',
+    'help' => 'Download content from a URL. If feed at URL supports PubSubHubbub, subscribe to hub for notifications of changes.',
+    'handler' => array(
+      'parent' => 'FeedsHTTPFetcher',
+      'class' => 'FeedsPubSubFetcher',
+      'file' => 'FeedsPubSubFetcher.inc',
+      'path' => $path,
+    ),
+  );
   $info['FeedsCSVParser'] = array(
     'name' => 'CSV parser',
     'description' => 'Parse data in Comma Separated Value format.',
Index: includes/FeedsImporter.inc
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/includes/FeedsImporter.inc,v
retrieving revision 1.12
diff -u -p -r1.12 FeedsImporter.inc
--- includes/FeedsImporter.inc	11 Feb 2010 00:26:49 -0000	1.12
+++ includes/FeedsImporter.inc	20 Feb 2010 00:56:17 -0000
@@ -104,10 +104,13 @@ class FeedsImporter extends FeedsConfigu
     switch ($job['callback']) {
       case 'import':
         return feeds_source($job['id'], $job['feed_nid'])->import();
-        break;
       case 'expire':
         return $this->expire();
+      case 'subscribe':
+        feeds_source($job['importer_id'], $job['feed_nid'])->fetcher->subscribe();
+        break;
     }
+    return FEEDS_BATCH_COMPLETE;
   }
 
   /**
@@ -123,6 +126,8 @@ class FeedsImporter extends FeedsConfigu
           return 3600;
         }
         return FEEDS_SCHEDULE_NEVER;
+      case 'subscribe':
+        return $this->fetcher->subscriptionPeriod();
     }
   }
 
@@ -130,7 +135,7 @@ class FeedsImporter extends FeedsConfigu
    * Expose available schedule callbacks.
    */
   public function getScheduleCallbacks() {
-    return array('import', 'expire');
+    return array('import', 'expire', 'subscribe');
   }
 
   /**
Index: libraries/PuSHSubscriber.inc
===================================================================
RCS file: libraries/PuSHSubscriber.inc
diff -N libraries/PuSHSubscriber.inc
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ libraries/PuSHSubscriber.inc	20 Feb 2010 00:56:17 -0000
@@ -0,0 +1,350 @@
+<?php
+
+/**
+ * @file
+ * Pubsubhubbub subscriber library.
+ */
+
+/**
+ * Pubsub subscriber.
+ */
+class PuSHSubscriber {
+  protected $domain;
+  protected $subscriber_id;
+  protected $subscription_class;
+  protected $env;
+
+  /**
+   * Singleton.
+   *
+   * PuSHSubscriber identifies a unique subscription by a domain and a numeric
+   * id. The numeric id is assumed to e unique in its domain.
+   *
+   * @param $domain
+   *   A string that identifies the domain in which $subscriber_id is unique.
+   * @param $subscriber_id
+   *   A numeric subscriber id.
+   * @param $subscription_class
+   *   The class to use for handling subscriptions. Class MUST implement
+   *   PuSHSubscriberSubscriptionInterface
+   * @param PuSHSubscriberEnvironmentInterface $env
+   *   Environmental object for messaging and logging.
+   */
+  public static function instance($domain, $subscriber_id, $subscription_class, PuSHSubscriberEnvironmentInterface $env) {
+    static $subscribers;
+    if (!isset($subscriber[$domain][$subscriber_id])) {
+      $subscriber = new PuSHSubscriber($domain, $subscriber_id, $subscription_class, $env);
+    }
+    return $subscriber;
+  }
+
+  /**
+   * Protect constructor.
+   */
+  protected function __construct($domain, $subscriber_id, $subscription_class, PuSHSubscriberEnvironmentInterface $env) {
+    $this->domain = $domain;
+    $this->subscriber_id = $subscriber_id;
+    $this->subscription_class = $subscription_class;
+    $this->env = $env;
+  }
+
+  /**
+   * Subscribe to a given URL. Attempt to retrieve 'hub' and 'self' links from
+   * document at $url and issue a subscription request to the hub.
+   *
+   * @param $url
+   *   The URL of the feed to subscribe to.
+   * @param $callback_url
+   *   The full URL that hub should invoke for subscription verification or for
+   *   notifications.
+   * @param $hub
+   *   The URL of a hub. If given overrides the hub URL found in the document
+   *   at $url.
+   */
+  public function subscribe($url, $callback_url, $hub = '') {
+    feeds_dbg(func_get_args());
+    // Fetch document, find rel=hub and rel=self.
+    // If present, issue subscription request.
+    $request = curl_init($url);
+    curl_setopt($request, CURLOPT_FOLLOWLOCATION, TRUE);
+    curl_setopt($request, CURLOPT_RETURNTRANSFER, TRUE);
+    $data = curl_exec($request);
+    if (curl_getinfo($request, CURLINFO_HTTP_CODE) == 200) {
+      $xml = new SimpleXMLElement($data);
+      $xml->registerXPathNamespace('atom', 'http://www.w3.org/2005/Atom');
+      if (empty($hub) && $hub = @current($xml->xpath("//atom:link[attribute::rel='hub']"))) {
+        $hub = (string) $hub->attributes()->href;
+      }
+      if ($self = @current($xml->xpath("//atom:link[attribute::rel='self']"))) {
+        $self = (string) $self->attributes()->href;
+      }
+    }
+    curl_close($request);
+    // Fall back to $url if $self is not given.
+    if (!$self) {
+      $self = $url;
+    }
+    if (!empty($hub) && !empty($self)) {
+      $this->request($hub, $self, 'subscribe', $callback_url);
+    }
+  }
+
+  /**
+   * @todo Unsubscribe from a hub.
+   * @todo Make sure we unsubscribe with the correct topic URL as it can differ
+   * from the initial subscription URL.
+   *
+   * @param $topic_url
+   *   The URL of the topic to unsubscribe from.
+   * @param $callback_url
+   *   The callback to unsubscribe.
+   */
+  public function unsubscribe($topic_url, $callback_url) {
+    if ($sub = $this->loadSubscription()) {
+      $this->request($sub->hub, $sub->topic, 'unsubscribe', $callback_url);
+      $sub->delete();
+    }
+  }
+
+  /**
+   * Issue a subscribe or unsubcribe request to a PubsubHubbub hub.
+   *
+   * @param $hub
+   *   The URL of the hub's subscription endpoint.
+   * @param $topic
+   *   The topic URL of the feed to subscribe to.
+   * @param $mode
+   *   'subscribe' or 'unsubscribe'.
+   * @param $callback_url
+   *   The subscriber's notifications callback URL.
+   *
+   * Compare to http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.2.html#anchor5
+   *
+   * @todo Make concurrency safe.
+   */
+  protected function request($hub, $topic, $mode, $callback_url) {
+    $secret = hash('sha1', uniqid(rand(), true));
+    $post_fields = array(
+      'hub.callback' => $callback_url,
+      'hub.mode' => $mode,
+      'hub.topic' => $topic,
+      'hub.verify' => 'sync',
+      'hub.lease_seconds' => '', // Permanent subscription.
+      'hub.secret' => $secret,
+      'hub.verify_token' => md5(session_id() . rand()),
+    );
+    $sub = new $this->subscription_class($this->domain, $this->subscriber_id, $hub, $topic, $secret, $mode, $post_fields);
+    $sub->save();
+    // Issue subscription request.
+    $request = curl_init($hub);
+    curl_setopt($request, CURLOPT_POST, TRUE);
+    curl_setopt($request, CURLOPT_POSTFIELDS, $post_fields);
+    curl_setopt($request, CURLOPT_RETURNTRANSFER, TRUE);
+    curl_exec($request);
+    $code = curl_getinfo($request, CURLINFO_HTTP_CODE);
+    if (in_array($code, array(202, 204))) {
+      $this->log("Positive response to \"$mode\" request ($code).");
+    }
+    else {
+      $sub->status = $mode .' failed';
+      $sub->save();
+      $this->msg("Error issuing \"$mode\" request ($code).", 'error');
+      $this->log("Error issuing \"$mode\" request ($code).", 'error');
+    }
+    curl_close($request);
+  }
+
+  /**
+   * Verify a request. After a hub has received a subscribe or unsubscribe
+   * request (see PuSHSubscriber::request()) it sends back a challenge verifying
+   * that an action indeed was requested ($_GET['hub_challenge']). This
+   * method handles the challenge.
+   */
+  public function verifyRequest() {
+    if (isset($_GET['hub_challenge'])) {
+      /**
+       * If a subscription is present, compare the verify token. If the token
+       * matches, set the status on the subscription record and confirm
+       * positive.
+       *
+       * If we cannot find a matching subscription and the hub checks on
+       * 'unsubscribe' confirm positive.
+       *
+       * In all other cases confirm negative.
+       */
+      if ($sub = $this->loadSubscription()) {
+        if ($_GET['hub_verify_token'] == $sub->post_fields['hub.verify_token']) {
+          if ($_GET['hub_mode'] == 'subscribe' && $sub->status == 'subscribe') {
+            $sub->status = 'subscribed';
+            $sub->post_fields = array();
+            $sub->save();
+            $this->log('Verified "subscribe" request.');
+            $verify = TRUE;
+          }
+          elseif ($_GET['hub_mode'] == 'unsubscribe' && $sub->status == 'unsubscribe') {
+            $sub->status = 'unsubscribed';
+            $sub->post_fields = array();
+            $sub->save();
+            $this->log('Verified "unsubscribe" request.');
+            $verify = TRUE;
+          }
+        }
+      }
+      elseif ($_GET['hub_mode'] == 'unsubscribe') {
+        $this->log('Verified "unsubscribe" request.');
+        $verify = TRUE;
+      }
+      if ($verify) {
+        header('HTTP/1.1 200 "Found"', null, 200);
+        print $_GET['hub_challenge'];
+        exit();
+      }
+    }
+    header('HTTP/1.1 404 "Not Found"', null, 404);
+    $this->log('Could not verify subscription.', 'error');
+    exit();
+  }
+
+  /**
+   * Receive a notification.
+   *
+   * @param $ignore_signature
+   *   If FALSE, only accept payload if there is a signature present and the
+   *   signature matches the payload. Warning: setting to TRUE results in
+   *   unsafe behavior.
+   *
+   * @return
+   *   An XML string that is the payload of the notification if valid, FALSE
+   *   otherwise.
+   */
+  public function receive($ignore_signature = FALSE) {
+    /**
+     * Verification steps:
+     *
+     * 1) Verify that this is indeed a POST reuest.
+     * 2) Verify that posted string is XML.
+     * 3) Per default verify sender of message by checking the message's
+     *    signature against the shared secret.
+     */
+    if ($_SERVER['REQUEST_METHOD'] == 'POST') {
+      $raw = file_get_contents('php://input');
+      if (@simplexml_load_string($raw)) {
+        if ($ignore_signature) {
+          return $raw;
+        }
+        if (isset($_SERVER['HTTP_X_HUB_SIGNATURE']) && ($sub = $this->loadSubscription())) {
+          $result = array();
+          parse_str($_SERVER['HTTP_X_HUB_SIGNATURE'], $result);
+          if (isset($result['sha1']) && $result['sha1'] == hash_hmac('sha1', $raw, $sub->secret)) {
+            return $raw;
+          }
+          else {
+            $this->log('Could not verify signature.', 'error');
+          }
+        }
+        else {
+          $this->log('No signature present.', 'error');
+        }
+      }
+    }
+    return FALSE;
+  }
+
+  /**
+   * Helper for loading a subscription.
+   */
+  protected function loadSubscription() {
+    return call_user_func("{$this->subscription_class}::load", $this->domain, $this->subscriber_id);
+  }
+
+  /**
+   * Helper for messaging.
+   */
+  protected function msg($msg, $level = 'status') {
+    $this->env->msg($msg, $level);
+  }
+
+  /**
+   * Helper for logging.
+   */
+  protected function log($msg, $level = 'status') {
+    $this->env->log("{$this->domain}:{$this->subscriber_id}\t$msg", $level);
+  }
+}
+
+/**
+ * Implement to provide a storage backend for subscriptions.
+ *
+ * Variables passed in to the constructor must be accessible as public class
+ * variables.
+ */
+interface PuSHSubscriptionInterface {
+  /**
+   * @param $domain
+   *   A string that defines the domain in which the subscriber_id is unique.
+   * @param $subscriber_id
+   *   A unique numeric subscriber id.
+   * @param $hub
+   *   The URL of the hub endpoint.
+   * @param $topic
+   *   The topic to subscribe to.
+   * @param $secret
+   *   A secret key used for message authentication.
+   * @param $status
+   *   The status of the subscription.
+   *   'subscribe' - subscribing to a feed.
+   *   'unsubscribe' - unsubscribing from a feed.
+   *   'subscribed' - subscribed.
+   *   'unsubscribed' - unsubscribed.
+   *   'subscribe failed' - subscribe request failed.
+   *   'unsubscribe failed' - unsubscribe request failed.
+   * @param $post_fields
+   *   An array of the fields posted to the hub.
+   */
+  public function __construct($domain, $subscriber_id, $hub, $topic, $secret, $status = '', $post_fields = '');
+
+  /**
+   * Save a subscription.
+   */
+  public function save();
+
+  /**
+   * Load a subscription.
+   *
+   * @return
+   *   A PuSHSubscriptionInterface object if a subscription exist, NULL
+   *   otherwise.
+   */
+  public static function load($domain, $subscriber_id);
+
+  /**
+   * Delete a subscription.
+   */
+  public function delete();
+}
+
+/**
+ * Implement to provide environmental functionality like user messages and
+ * logging.
+ */
+interface PuSHSubscriberEnvironmentInterface {
+  /**
+   * A message to be displayed to the user on the current page load.
+   *
+   * @param $msg
+   *   A string that is the message to be displayed.
+   * @param $level
+   *   A string that is either 'status', 'warning' or 'error'.
+   */
+  public function msg($msg, $level = 'status');
+
+  /**
+   * A log message to be logged to the database or the file system.
+   *
+   * @param $msg
+   *   A string that is the message to be displayed.
+   * @param $level
+   *   A string that is either 'status', 'warning' or 'error'.
+   */
+  public function log($msg, $level = 'status');
+}
Index: plugins/FeedsFetcher.inc
===================================================================
RCS file: /cvs/drupal-contrib/contributions/modules/feeds/plugins/FeedsFetcher.inc,v
retrieving revision 1.4
diff -u -p -r1.4 FeedsFetcher.inc
--- plugins/FeedsFetcher.inc	20 Dec 2009 23:54:44 -0000	1.4
+++ plugins/FeedsFetcher.inc	20 Feb 2010 00:56:17 -0000
@@ -26,4 +26,78 @@ abstract class FeedsFetcher extends Feed
    *   caches pertaining to this source.
    */
   public function clear(FeedsSource $source) {}
+
+  /**
+   * Request handler invoked if callback URL is requested. Locked down by
+   * default. For an example use see FeedsPubSubFetcher.
+   *
+   * Method may exit the script.
+   *
+   * @return
+   *   A string to be returned to the client.
+   */
+  public function request($feed_nid = 0) {
+    drupal_access_denied();
+  }
+
+  /**
+   * Construct a path for a concrete fetcher/source combination. The result of
+   * this method matches up with the general path definition in
+   * FeedsFetcher::menuItem(). For example usage look at FeedsPubSubFetcher.
+   *
+   * @return
+   *   Path for this fetcher/source combination.
+   */
+  public function path($feed_nid = 0) {
+    if ($feed_nid) {
+      return urlencode('feeds/fetcher/'. $this->id .'/'. $feed_nid);
+    }
+    return urlencode('feeds/fetcher/'. $this->id);
+  }
+
+  /**
+   * Menu item definition for fetchers of this class. Note how the path
+   * component in the item definition matches the return value of
+   * FeedsFetcher::path();
+   *
+   * Requests to this menu item will be routed to FeedsFetcher::request().
+   *
+   * @return
+   *   An array where the key is the Drupal menu item path and the value is
+   *   a valid Drupal menu item definition.
+   */
+  public function menuItem() {
+    return array(
+      'feeds/fetcher/%feeds_importer' => array(
+        'page callback' => 'feeds_fetcher_callback',
+        'page arguments' => array(2, 3),
+        'access callback' => TRUE,
+        'file' => 'feeds.pages.inc',
+        'type' => MENU_CALLBACK,
+        ),
+      );
+  }
+
+  /**
+   * Subscribe to a source. Only implement if fetcher requires subscription.
+   *
+   * @param FeedsSource $source
+   *   Source information for this subscription.
+   */
+  public function subscribe(FeedsSource $source) {}
+
+  /**
+   * Unsubscribe from a source. Only implement if fetcher requires subscription.
+   *
+   * @param FeedsSource $source
+   *   Source information for unsubscribing.
+   */
+  public function unsubscribe(FeedsSource $source) {}
+
+  /**
+   * Indicate the time within which a subscription needs to be renewed.
+   */
+  public function subscriptionPeriod() {
+    return FEEDS_SCHEDULE_NEVER;
+  }
 }
Index: plugins/FeedsPubSubFetcher.inc
===================================================================
RCS file: plugins/FeedsPubSubFetcher.inc
diff -N plugins/FeedsPubSubFetcher.inc
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ plugins/FeedsPubSubFetcher.inc	20 Feb 2010 00:56:17 -0000
@@ -0,0 +1,269 @@
+<?php
+
+feeds_include_library('PuSHSubscriber.inc', 'PuSHSubscriber');
+
+/**
+ * FeedsImportBatch for PubSub Fetcher.
+ */
+class FeedsPubSubBatch extends FeedsHTTPBatch {
+  protected $raw;
+
+  /**
+   * Constructor
+   *
+   * @param $raw
+   *   The raw content of the fat ping.
+   */
+  function __construct($raw) {
+    $this->raw = $raw;
+    parent::__construct();
+  }
+
+  /**
+   * Override FeedsHTTPBatch::getRaw() and simply return the raw content
+   * populated on instnatiation.
+   */
+  function getRaw() {
+    return $this->raw;
+  }
+}
+
+/**
+ * Publish/Subscribe fetcher. Supports at the moment only PubSubHubbub (PuSH).
+ */
+class FeedsPubSubFetcher extends FeedsHTTPFetcher {
+
+  /**
+   * Implementation of FeedsFetcher::fetch().
+   */
+  public function fetch(FeedsSource $source) {
+    $source_config = $source->getConfigFor($this);
+    // Handle fat ping if present, otherwise pass up to HTTP fetcher.
+    if ($raw = feeds_push_subscriber($this->id, $source->feed_nid)->receive($this->config['ignore_signature'])) {
+      return new FeedsPubSubBatch($raw);
+    }
+    return parent::fetch($source);
+  }
+
+  /**
+   * Implementation of FeedsFetcher::request().
+   */
+  public function request($feed_nid = 0) {
+    feeds_dbg($_GET);
+    @feeds_dbg(file_get_contents('php://input'));
+    // A subscription verification has been sent, verify.
+    if (isset($_GET['hub_challenge'])) {
+      $this->pushVerifyRequest($feed_nid);
+    }
+    // No subscription notification has ben sent, we are being notified.
+    else {
+      try {
+        feeds_source($this->id, $feed_nid)->existing()->import();
+      }
+      catch (Exception $e) {
+        // In case of an error, respond with a 503 Service (temporary) unavailable.
+        header('HTTP/1.1 503 "Not Found"', null, 503);
+        exit();
+      }
+    }
+    // Will generate the default 200 response.
+    return '';
+  }
+
+  /**
+   * Override sourceSave() - subscribe to hub.
+   */
+  public function sourceSave(FeedsSource $source) {
+    $this->subscribe($source);
+  }
+
+  /**
+   * Override sourceDelete() - unsubscribe from hub.
+   */
+  public function sourceDelete(FeedsSource $source) {
+    $this->unsubscribe($source);
+  }
+
+  /**
+   * Implement FeedsFetcher::subscribe() - subscribe to hub.
+   */
+  public function subscribe(FeedsSource $source) {
+    $source_config = $source->getConfigFor($this);
+    feeds_push_subscriber($this->id, $source->feed_nid)->subscribe($source_config['source'], url($this->path($source->feed_nid), array('absolute' => TRUE)), valid_url($this->config['designated_hub']) ? $this->config['designated_hub'] : '');
+  }
+
+  /**
+   * Implement FeedsFetcher::unsubscribe() - unsubscribe from hub.
+   */
+  public function unsubscribe(FeedsSource $source) {
+    $source_config = $source->getConfigFor($this);
+    feeds_push_subscriber($this->id, $source->feed_nid)->unsubscribe($source_config['source'], url($this->path($source->feed_nid), array('absolute' => TRUE)));
+  }
+
+  /**
+   * Implement FeedsFetcher::subscriptionPeriod().
+   * Indicate how often a subscription needs to be renewed.
+   *
+   * @todo subscription_period should actually be retrieved from the hub's
+   *   response to a subscription. This will mean a different subscription
+   *   period per source, hence a major change to FeedsScheduler which is
+   *   currently assuming a fixed period per task!
+   */
+  public function subscriptionPeriod() {
+    return $this->config['subscription_period'];
+  }
+
+  /**
+   * Verify a PubSubHubbub subscription request.
+   */
+  public function pushVerifyRequest($feed_nid) {
+    feeds_push_subscriber($this->id, $feed_nid)->verifyRequest();
+  }
+
+  /**
+   * Return defaults for configuration.
+   */
+  public function configDefaults() {
+    $defaults = parent::configDefaults();
+    return $defaults + array(
+      'subscription_period' => 3600*24, // Renew subscription in 24 hours.
+      'designated_hub' => '',
+      'ignore_signature' => FALSE,
+    );
+  }
+
+  /**
+   * Override parent::configForm().
+   */
+  public function configForm(&$form_state) {
+    $form = parent::configForm($form_state);
+    $period = drupal_map_assoc(array(0, 900, 1800, 3600, 10800, 21600, 43200, 86400, 259200, 604800, 2419200), 'format_interval');
+    $period[FEEDS_SCHEDULE_NEVER] = t('Never renew');
+    $period[0] = t('Renew as often as possible');
+    $form['subscription_period'] = array(
+      '#type' => 'select',
+      '#title' => t('Renew subscription after'),
+      '#options' => $period,
+      '#description' => t('This is the minimum time that must elapse before a subscription is renewed.'),
+      '#default_value' => $this->config['subscription_period'],
+    );
+    $form['designated_hub'] = array(
+      '#type' => 'textfield',
+      '#title' => t('Designated hub'),
+      '#description' => t('Enter the callback URL of a designated hub. If given, this hub will be used instead of the hub specified in the feed source.'),
+      '#default_value' => $this->config['designated_hub'],
+    );
+    $form['ignore_signature'] = array(
+      '#type' => 'checkbox',
+      '#title' => t('Ignore signatures'),
+      '#description' => t('Check to ignore signatures on notifications. <strong>Warning:</strong> Ignoring signatures results in unsafe behavior. Potential attackers can post any content without authentication.'),
+      '#default_value' => $this->config['ignore_signature'],
+    );
+    return $form;
+  }
+}
+
+/**
+ * Create a PubSubHubbub subscriber.
+ *
+ * @return PushSubscriber
+ *   A PushSubscriber object.
+ */
+function feeds_push_subscriber($id, $subscriber_id) {
+  return PushSubscriber::instance($id, $subscriber_id, 'PuSHSubscription', PuSHEnvironment::instance());
+}
+
+/**
+ * Implement a PuSHSubscriptionInterface.
+ */
+class PuSHSubscription implements PuSHSubscriptionInterface {
+  public $domain;
+  public $subscriber_id;
+  public $hub;
+  public $topic;
+  public $status;
+  public $secret;
+  public $post_fields;
+  public $timestamp;
+
+  /**
+   * Load a subscription.
+   */
+  public static function load($domain, $subscriber_id) {
+    if ($v = db_fetch_array(db_query("SELECT * FROM {feeds_push_subscriptions} WHERE domain = '%s' AND subscriber_id = %d", $domain, $subscriber_id))) {
+      $v['post_fields'] = unserialize($v['post_fields']);
+      return new PuSHSubscription($v['domain'], $v['subscriber_id'], $v['hub'], $v['topic'], $v['secret'], $v['status'], $v['post_fields'], $v['timestamp']);
+    }
+  }
+
+  /**
+   * Create a subscription.
+   */
+  public function __construct($domain, $subscriber_id, $hub, $topic, $secret, $status = '', $post_fields = '') {
+    $this->domain = $domain;
+    $this->subscriber_id = $subscriber_id;
+    $this->hub = $hub;
+    $this->topic = $topic;
+    $this->status = $status;
+    $this->secret = $secret;
+    $this->post_fields = $post_fields;
+  }
+
+  /**
+   * Save a subscription.
+   */
+  public function save() {
+    $this->timestamp = time();
+    $this->delete($this->domain, $this->subscriber_id);
+    drupal_write_record('feeds_push_subscriptions', $this);
+  }
+
+  /**
+   * Delete a subscription.
+   */
+  public function delete() {
+    db_query("DELETE FROM {feeds_push_subscriptions} WHERE domain = '%s' AND subscriber_id = %d", $this->domain, $this->subscriber_id);
+  }
+}
+
+/**
+ * Provide environmental functions to the PuSHSubscriber library.
+ */
+class PuSHEnvironment implements PuSHSubscriberEnvironmentInterface {
+  /**
+   * Singleton.
+   */
+  public static function instance() {
+    static $env;
+    if (empty($env)) {
+      $env = new PuSHEnvironment();
+    }
+    return $env;
+  }
+
+  /**
+   * Implementation of PuSHSubscriberEnvironmentInterface::msg().
+   */
+  public function msg($msg, $level = 'status') {
+    drupal_set_message($msg, $level);
+  }
+
+  /**
+   * Implementation of PuSHSubscriberEnvironmentInterface::log().
+   */
+  public function log($msg, $level = 'status') {
+    switch ($level) {
+      case 'error':
+        $severity = WATCHDOG_ERROR;
+        break;
+      case 'warning':
+        $severity = WATCHDOG_WARNING;
+        break;
+      default:
+        $severity = WATCHDOG_NOTICE;
+        break;
+    }
+    feeds_dbg($msg);
+    watchdog('FeedsPubSubFetcher', $msg, array(), $severity);
+  }
+}
