From e602b1999c6cb3b2d6c009b0bb5dc7c3b08a3476 Mon Sep 17 00:00:00 2001 From: Bob Vincent Date: Sat, 21 May 2011 01:42:56 -0400 Subject: [PATCH] Issue #361071 by MauMau, Sutharsan, Frans, BrockBoland, miro_dietiker, Fabianx, Vasudeva, pillarsdotnet: Use locking to eliminate duplicate emails from concurrent simplenews cron runs. --- includes/simplenews.mail.inc | 91 ++++++++++++++++++++++++++++++++++------- simplenews.module | 1 + 2 files changed, 76 insertions(+), 16 deletions(-) diff --git a/includes/simplenews.mail.inc b/includes/simplenews.mail.inc index 0ec9d87cdf3e64820f163e63dad746084ea5e84f..e3abd9dff7c4bc0a1f067e661dd44886871b35d8 100644 --- a/includes/simplenews.mail.inc +++ b/includes/simplenews.mail.inc @@ -270,7 +270,7 @@ function simplenews_mail_spool($nid = NULL, $vid = NULL, $limit = NULL) { // Send pending messages from database cache // A limited number of mails is retrieved from the spool $limit = isset($limit) ? $limit : variable_get('simplenews_throttle', 20); - if ($spool_list = simplenews_get_spool(SIMPLENEWS_SPOOL_PENDING, $nid, $vid, $limit)) { + if ($spool_list = simplenews_get_spool(array(SIMPLENEWS_SPOOL_PENDING, SIMPLENEWS_SPOOL_IN_PROGRESS), $limit)) { // Prevent session information from being saved while sending. if ($original_session = drupal_save_session()) { drupal_save_session(FALSE); @@ -366,6 +366,12 @@ function simplenews_save_spool($spool) { ->execute(); } +function simplenews_get_expiration_time() { + $timeout = variable_get('simplenews_spool_progress_expiration', 3600); + $expiration_time = REQUEST_TIME - $timeout; + return $expiration_time; +} + /** * Retrieve data from mail spool * @@ -383,26 +389,68 @@ function simplenews_save_spool($spool) { * $spool['time'] * @todo Convert output to array of objects. */ -function simplenews_get_spool($status, $nid = NULL, $vid = NULL, $limit = 0) { - $spool = array(); +function simplenews_get_spool($status, $limit = NULL) { + $messages = array(); + $clauses = array(); + $params = array(); + + if (!is_array($status)) { + $status = array($status); + } + + foreach ($status as $s) { + if ($s == SIMPLENEWS_SPOOL_IN_PROGRESS) { + // Select messages which are allocated by another process, but whose + // maximum send time has expired. + $clauses[] = '(s.status = :status AND s.timestamp < :expiration)'; + $params[':status'] = $s; + $params[':expiration'] = simplenews_get_expiration_time(); + } + else { + $clauses[] = 's.status = :status'; + $params[':status'] = $s; + } + } $query = db_select('simplenews_mail_spool', 's') ->fields('s') - ->condition('s.status', $status) + ->where(implode(' OR ', $clauses), $params) ->orderBy('s.timestamp', 'ASC'); - if ($limit) { - $query->range(0, $limit); - } - foreach ($query->execute() as $row) { - if (strlen($row->data)) { - $row->data = unserialize($row->data); + + /* BEGIN CRITICAL SECTION */ + // The semaphore ensures that multiple processes get different message ID's, + // so that duplicate messages are not sent. + + if (lock_acquire('simplenews_acquire_mail')) { + // Get message id's + // Allocate messages + if (is_numeric($limit)) { + $query->range(0, $limit); + } + foreach ($query->execute() as $message) { + if (strlen($message->data)) { + $message->data = unserialize($message->data); + } + else { + $mail = (object)array('mail' => $message->mail); + $message->data = simplenews_get_subscription($mail); + } + $messages[$message->msid] = $message; } - else { - $row->data = simplenews_get_subscription((object)array('mail' => $row->mail)); + if (count($messages) > 0) { + // Set the state and the timestamp of the messages + simplenews_update_spool( + array_keys($messages), + array('status' => SIMPLENEWS_SPOOL_IN_PROGRESS) + ); } - $spool[$row->msid] = $row; + + lock_release('simplenews_acquire_mail'); } - return $spool; + + /* END CRITICAL SECTION */ + + return $messages; } /** @@ -437,11 +485,22 @@ function simplenews_update_spool($msids, $data) { * * @return array Mail message array */ -function simplenews_count_spool($nid, $vid, $status = SIMPLENEWS_SPOOL_PENDING) { +function simplenews_count_spool($nid, $vid, $status = array(SIMPLENEWS_SPOOL_PENDING, SIMPLENEWS_SPOOL_IN_PROGRESS)) { + $clauses = array(); + $params = array(); + + if (!is_array($status)) { + $status = array($status); + } + + foreach ($status as $s) { + $clauses[] = "s.status = $s"; + } + $query = db_select('simplenews_mail_spool') ->condition('nid', $nid) ->condition('vid', $vid) - ->condition('status', $status); + ->where(implode(' OR ', $clauses)); return $query->countQuery()->execute()->fetchField(); } diff --git a/simplenews.module b/simplenews.module index abf2a6c3b9f5057548028e6ac7ba71a940d32c5e..a1e686e970d381e970a7ef24f3c3f098596fb947 100644 --- a/simplenews.module +++ b/simplenews.module @@ -58,6 +58,7 @@ define('SIMPLENEWS_STATUS_SEND_READY', 2); define('SIMPLENEWS_SPOOL_HOLD', 0); define('SIMPLENEWS_SPOOL_PENDING', 1); define('SIMPLENEWS_SPOOL_DONE', 2); +define('SIMPLENEWS_SPOOL_IN_PROGRESS', 3); /** * AFTER EACH 100 NEWSLETTERS -- 1.7.4.1