diff --git a/message_digest.install b/message_digest.install
index 81f7fd3..c59d617 100644
--- a/message_digest.install
+++ b/message_digest.install
@@ -83,7 +83,6 @@ function message_digest_schema() {
 /**
  * Adds indexes to the message_digest table.
  */
-
 function message_digest_update_8100() {
   $spec = [
     'fields' => [
diff --git a/src/DigestManager.php b/src/DigestManager.php
index 3ecaeb7..a938cd9 100644
--- a/src/DigestManager.php
+++ b/src/DigestManager.php
@@ -156,28 +156,30 @@ class DigestManager implements DigestManagerInterface {
         continue;
       }
 
-      // Gather up all the messages into neat little digests and send 'em out.
-      // It is up to each digest plugin to manage last sent time, etc.
-      // @see \Drupal\message_digest\Plugin\Notifier\DigestBase
-      $recipients = $notifier->getRecipients();
-      $start_time = $notifier->getStartTime();
-      foreach ($recipients as $uid) {
-        // Queue each recipient digest for processing and sending.
-        $data = [
-          'uid' => $uid,
-          'notifier_id' => $plugin_id,
-          'start_time' => $start_time,
-        ];
-        $this->queue->createItem($data);
+      if ($notifier->processDigest()) {
+        // Gather up all the messages into neat little digests and send 'em out.
+        // It is up to each digest plugin to manage last sent time, etc.
+        // @see \Drupal\message_digest\Plugin\Notifier\DigestBase
+        $recipients = $notifier->getRecipients();
+        $end_time = $notifier->getEndTime();
+        foreach ($recipients as $uid) {
+          // Queue each recipient digest for processing and sending.
+          $data = [
+            'uid' => $uid,
+            'notifier_id' => $plugin_id,
+            'end_time' => $end_time,
+          ];
+          $this->queue->createItem($data);
+        }
+        $notifier->setLastSent();
       }
-      $notifier->setLastSent();
     }
   }
 
   /**
    * {@inheritdoc}
    */
-  public function processSingleUserDigest($account_id, $notifier_id, $start_time) {
+  public function processSingleUserDigest($account_id, $notifier_id, $end_time) {
     $account = $this->entityTypeManager->getStorage('user')->load($account_id);
 
     /** @var \Drupal\message_digest\Plugin\Notifier\DigestInterface $notifier */
@@ -186,7 +188,7 @@ class DigestManager implements DigestManagerInterface {
 
     $plugin_definition = $notifier->getPluginDefinition();
     $view_modes = array_combine($plugin_definition['viewModes'], $plugin_definition['viewModes']);
-    $digests = $notifier->aggregate($account_id, $start_time);
+    $digests = $notifier->aggregate($account_id, $end_time);
     $max_mid = 0;
     foreach ($digests as $entity_type) {
       foreach ($entity_type as $entity_id => $message_ids) {
diff --git a/src/DigestManagerInterface.php b/src/DigestManagerInterface.php
index bf3a122..7f4be23 100644
--- a/src/DigestManagerInterface.php
+++ b/src/DigestManagerInterface.php
@@ -26,9 +26,9 @@ interface DigestManagerInterface {
    *   The recipient's account ID.
    * @param string $notifier_id
    *   The digest notifier plugin ID.
-   * @param int $start_time
-   *   The unix timestamp from which to aggregate digests.
+   * @param int $end_time
+   *   The unix timestamp prior to which to aggregate digests.
    */
-  public function processSingleUserDigest($account_id, $notifier_id, $start_time);
+  public function processSingleUserDigest($account_id, $notifier_id, $end_time);
 
 }
diff --git a/src/Plugin/Notifier/DigestBase.php b/src/Plugin/Notifier/DigestBase.php
index bc6fcfb..5fe8fd4 100644
--- a/src/Plugin/Notifier/DigestBase.php
+++ b/src/Plugin/Notifier/DigestBase.php
@@ -42,13 +42,6 @@ abstract class DigestBase extends MessageNotifierBase implements ContainerFactor
   protected $requestStack;
 
   /**
-   * Start time as a unixtimestamp.
-   *
-   * @var int
-   */
-  protected $startTime;
-
-  /**
    * The state service for tracking last sent time.
    *
    * @var \Drupal\Core\State\StateInterface
@@ -164,15 +157,9 @@ abstract class DigestBase extends MessageNotifierBase implements ContainerFactor
    * {@inheritdoc}
    */
   public function getRecipients() {
-    if (!$this->processDigest()) {
-      return [];
-    }
-
-    $start = $this->getStartTime();
-
     $query = $this->connection->select('message_digest', 'md');
     $query->fields('md', ['receiver']);
-    $query->condition('timestamp', $start, '>');
+    $query->condition('timestamp', $this->getEndTime(), '<=');
     $query->condition('sent', FALSE);
     $query->condition('notifier', $this->getPluginId());
     $query->distinct();
@@ -183,12 +170,12 @@ abstract class DigestBase extends MessageNotifierBase implements ContainerFactor
   /**
    * {@inheritdoc}
    */
-  public function aggregate($uid, $start) {
+  public function aggregate($uid, $end) {
     $message_groups = [];
 
     $query = $this->connection->select('message_digest', 'md')
       ->fields('md')
-      ->condition('timestamp', $start, '>')
+      ->condition('timestamp', $end, '<=')
       ->condition('receiver', $uid)
       ->condition('sent', FALSE)
       ->condition('notifier', $this->getPluginId());
@@ -217,7 +204,7 @@ abstract class DigestBase extends MessageNotifierBase implements ContainerFactor
    * @return bool
    *   Returns TRUE if a sufficient amount of time has passed.
    */
-  protected function processDigest() {
+  public function processDigest() {
     $interval = $this->getInterval();
     $key = $this->pluginId . '_last_run';
     $last_run = $this->state->get($key, 0);
@@ -239,13 +226,8 @@ abstract class DigestBase extends MessageNotifierBase implements ContainerFactor
   /**
    * {@inheritdoc}
    */
-  public function getStartTime() {
-    if (!isset($this->startTime)) {
-      $interval = $this->getInterval();
-      // Invert $interval since it's in the past.
-      $this->startTime = strtotime('-' . $interval, $this->requestStack->getCurrentRequest()->server->get('REQUEST_TIME'));
-    }
-    return $this->startTime;
+  public function getEndTime() {
+    return $this->requestStack->getCurrentRequest()->server->get('REQUEST_TIME');
   }
 
   /**
diff --git a/src/Plugin/Notifier/DigestInterface.php b/src/Plugin/Notifier/DigestInterface.php
index 385e62c..70969fb 100644
--- a/src/Plugin/Notifier/DigestInterface.php
+++ b/src/Plugin/Notifier/DigestInterface.php
@@ -19,8 +19,9 @@ interface DigestInterface extends PluginInspectionInterface, DerivativeInspectio
    *
    * @param int $uid
    *   The recipient's user ID for which to aggregate the digest.
-   * @param int $start_time
-   *   The unix timestamp in the past from which to aggregate messages.
+   * @param int $end_time
+   *   The unix timestamp from which all messages in the past will be
+   *   aggregated.
    *
    * @return array
    *   An array of message IDs for the given user, keyed by entity_type, then by
@@ -38,7 +39,7 @@ interface DigestInterface extends PluginInspectionInterface, DerivativeInspectio
    *   ];
    * @endcode
    */
-  public function aggregate($uid, $start_time);
+  public function aggregate($uid, $end_time);
 
   /**
    * Get a unique list of recipient user IDs for this digest.
@@ -59,12 +60,20 @@ interface DigestInterface extends PluginInspectionInterface, DerivativeInspectio
   public function markSent(UserInterface $account, $last_mid);
 
   /**
-   * Gets the start time for which to digest messages.
+   * Determine if it is time to process this digest or not.
+   *
+   * @return bool
+   *   Returns TRUE if a sufficient amount of time has passed.
+   */
+  public function processDigest();
+
+  /**
+   * Gets the end time for which to digest messages prior to.
    *
    * @return int
-   *   The unix timestamp for which all messages since will be digested.
+   *   The unix timestamp for which all messages prior will be digested.
    */
-  public function getStartTime();
+  public function getEndTime();
 
   /**
    * Sets the last sent time.
diff --git a/src/Plugin/QueueWorker/MessageDigest.php b/src/Plugin/QueueWorker/MessageDigest.php
index 8efcaa7..a9f8c8c 100644
--- a/src/Plugin/QueueWorker/MessageDigest.php
+++ b/src/Plugin/QueueWorker/MessageDigest.php
@@ -51,7 +51,7 @@ class MessageDigest extends QueueWorkerBase implements ContainerFactoryPluginInt
    * {@inheritdoc}
    */
   public function processItem($data) {
-    $this->digestManager->processSingleUserDigest($data['uid'], $data['notifier_id'], $data['start_time']);
+    $this->digestManager->processSingleUserDigest($data['uid'], $data['notifier_id'], $data['end_time']);
   }
 
 }
diff --git a/tests/src/Kernel/DigestManagerTest.php b/tests/src/Kernel/DigestManagerTest.php
new file mode 100644
index 0000000..4623096
--- /dev/null
+++ b/tests/src/Kernel/DigestManagerTest.php
@@ -0,0 +1,48 @@
+<?php
+
+namespace Drupal\Tests\message_digest\Kernel;
+
+/**
+ * Tests for the digest manager.
+ *
+ * @group message_digest
+ *
+ * @coversDefaultClass \Drupal\message_digest\DigestManager
+ */
+class DigestManagerTest extends DigestTestBase {
+
+  /**
+   * Tests processing.
+   *
+   * @covers ::processDigests
+   */
+  public function testProcessDigests() {
+    /** @var \Drupal\message_digest\DigestManagerInterface $manager */
+    $manager = \Drupal::service('message_digest.manager');
+
+    // Verify last sent time is not set.
+    $this->assertEquals(0, $this->container->get('state')->get('message_digest:weekly_last_run', 0));
+
+    $expected = $this->container->get('request_stack')->getCurrentRequest()->server->get('REQUEST_TIME');
+    $this->container->get('cron')->run();
+
+    // Last run should be set.
+    $last_run = $this->container->get('state')->get('message_digest:weekly_last_run', 0);
+    $this->assertEquals($expected, $last_run);
+
+    // Update request time.
+    $this->container->get('request_stack')->getCurrentRequest()->server->set('REQUEST_TIME', $expected + 60);
+
+    // Run again, the last_run should not be changed until sufficient time has
+    // passed.
+    $this->container->get('cron')->run();
+    $this->assertEquals($expected, $this->container->get('state')->get('message_digest:weekly_last_run', 0));
+
+    // Update request time to more than a week in the future.
+    $this->container->get('request_stack')->getCurrentRequest()->server->set('REQUEST_TIME', $expected + 60 * 60 * 24 * 8);
+    $expected = $this->container->get('request_stack')->getCurrentRequest()->server->get('REQUEST_TIME');
+    $this->container->get('cron')->run();
+    $this->assertEquals($expected, $this->container->get('state')->get('message_digest:weekly_last_run', 0));
+  }
+
+}
diff --git a/tests/src/Kernel/MessageDigestTest.php b/tests/src/Kernel/MessageDigestTest.php
index 1919cd3..e8572a1 100644
--- a/tests/src/Kernel/MessageDigestTest.php
+++ b/tests/src/Kernel/MessageDigestTest.php
@@ -102,7 +102,7 @@ class MessageDigestTest extends DigestTestBase {
     }
 
     // Aggregate and mark processed.
-    $start_time = $digest_notifier->getStartTime();
+    $start_time = $digest_notifier->getEndTime();
     $recipients = $digest_notifier->getRecipients();
     $this->assertEquals(1, count($recipients));
     foreach ($recipients as $uid) {
@@ -119,9 +119,9 @@ class MessageDigestTest extends DigestTestBase {
       $this->assertSame($expected, $results);
     }
 
-    // Since this has been marked as sent, calling recipients again should
-    // result in an empty array.
-    $this->assertEmpty($digest_notifier->getRecipients());
+    // Since this has been marked as sent, the notifier should return FALSE
+    // for processing again.
+    $this->assertFalse($digest_notifier->processDigest());
 
     // Set last sent time to 8 days in the past.
     $last_run = strtotime('-8 days', $this->container->get('request_stack')->getCurrentRequest()->server->get('REQUEST_TIME'));
@@ -169,7 +169,7 @@ class MessageDigestTest extends DigestTestBase {
     }
 
     // Aggregate and mark processed.
-    $results = $digest_notifier->aggregate($account->id(), $digest_notifier->getStartTime());
+    $results = $digest_notifier->aggregate($account->id(), $digest_notifier->getEndTime());
     $digest_notifier->setLastSent();
     $expected = [
       '' => [
@@ -252,7 +252,7 @@ class MessageDigestTest extends DigestTestBase {
       $this->notifierSender->send($message, [], $digest_notifier->getPluginId());
       $messages[$i] = $message;
     }
-    $digest = $digest_notifier->aggregate($account->id(), $digest_notifier->getStartTime());
+    $digest = $digest_notifier->aggregate($account->id(), $digest_notifier->getEndTime());
     $this->assertEquals(10, count($digest['']['']));
     $digest_notifier->markSent($account, $messages[10]->id());
 
