diff --git a/includes/simplenews.mail.inc b/includes/simplenews.mail.inc index 00a1f13..6c4af5b 100644 --- a/includes/simplenews.mail.inc +++ b/includes/simplenews.mail.inc @@ -274,7 +274,7 @@ function simplenews_mail_spool($nid = NULL, $vid = NULL, $limit = NULL) { // Send pending messages from database cache // A limited number of mails is retrieved from the spool $limit = isset($limit) ? $limit : variable_get('simplenews_throttle', 20); - if ($messages = simplenews_get_spool(SIMPLENEWS_SPOOL_PENDING, $nid, $vid, $limit)) { + if ($messages = simplenews_get_spool(array(SIMPLENEWS_SPOOL_PENDING, SIMPLENEWS_SPOOL_IN_PROGRESS), $limit)) { $count_fail = $count_success = 0; // switch to anonymous user - needed in foreground spool sends, adopted from D7 drupal_cron_run(). @@ -364,6 +364,17 @@ function simplenews_save_spool($message) { $status, $time); } +/* + * Returns the expiration time for IN_PROGRESS status. + * + * @return int Time the message allocation expires and is resent + */ +function simplenews_get_expiration_time() { + $timeout = variable_get('simplenews_spool_progress_expiration', 3600); + $expiration_time = time() - $timeout; + return $expiration_time; +} + /** * Retrieve data from mail spool * @@ -381,22 +392,71 @@ function simplenews_save_spool($message) { * $message['status'] * $message['time'] */ -function simplenews_get_spool($status, $nid = NULL, $vid = NULL, $limit = 999999) { + +/** + * This function allocates messages to be sent in current run. + * + * Drupal acquire_lock quarantees that no concurrency issue happend. + * If message status is SIMPLENEWS_SPOOL_IN_PROGRESS but the maximum send time has expired the message id will be returned as a message which is not allocated to another process + * MessageIDs to be sent in current run are returned. + * + * @param string $status Status of data to be retrieved (0 = hold, 1 = pending, 2 = send, 3 = in progress) + * @param integer $limit The maximum number of mails to load from the spool + * + * @return array array with msids + */ +function simplenews_get_spool($status, $limit = NULL) { $messages = array(); + $clauses = array(); + $params = array(); + + if (!is_array($status)) { + $status = array($status); + } + + foreach ($status as $s) { + if ($s == SIMPLENEWS_SPOOL_IN_PROGRESS) { + // Select messages which are allocated by another process, but where the maximum send time has expired. + $clauses[] = '(s.status = %d AND s.timestamp < %d)'; + $params[] = $s; + $params[] = simplenews_get_expiration_time(); + } + else { + $clauses[] = 's.status = %d'; + $params[] = $s; + } + } - $result = db_query_range(" - SELECT * + $query = "SELECT * FROM {simplenews_mail_spool} s - WHERE s.status = %d - ORDER BY s.timestamp ASC", - $status, 0, $limit); - while ($data = db_fetch_array($result)) { - $message = array(); - foreach ($data as $key => $value) { - $message[$key] = $value; + WHERE " . implode(' OR ', $clauses) . " + ORDER BY s.timestamp ASC"; + + /* BEGIN CRITICAL SECTION */ + // The semaphore ensures that multiple processes get different message ID's. So there could not occur any duplicate messages. + + if (lock_acquire('simplenews_acquire_mail')) { + // Get message id's + // Allocate messages + if (is_numeric($limit)) { + $result = db_query_range($query, $params, 0, $limit); + } + else { + $result = db_query($query, $params); + } + while ($message = db_fetch_array($result)) { + $messages[$message['msid']] = $message; } - $messages[$data['msid']] = $message; + if (count($messages) > 0) { + // Set the state and the timestamp of the messages + simplenews_update_spool(array_keys($messages), array('status' => SIMPLENEWS_SPOOL_IN_PROGRESS)); + } + + lock_release('simplenews_acquire_mail'); } + + /* END CRITICAL SECTION */ + return $messages; } @@ -409,7 +469,7 @@ function simplenews_get_spool($status, $nid = NULL, $vid = NULL, $limit = 999999 * Mail spool id of record to be updated * @param array $result * Array containing email sent result - * 'status' => (0 = hold, 1 = pending, 2 = send) + * 'status' => (0 = hold, 1 = pending, 2 = send, 3 = in progress) * 'error' => error id (optional; defaults to '') */ function simplenews_update_spool($msids, $result) { @@ -433,18 +493,30 @@ function simplenews_update_spool($msids, $result) { * @param integer $vid newsletter revision id * @param string $status email sent status * - * @return array Mail message array + * @return integer count of mail spool elements wich owns the attributes passed in as params */ -function simplenews_count_spool($nid, $vid, $status = SIMPLENEWS_SPOOL_PENDING) { - return db_result(db_query(" - SELECT COUNT(*) - FROM {simplenews_mail_spool} - WHERE nid = %d - AND vid = %d - AND status = %d", - $nid, - $vid, - $status)); +function simplenews_count_spool($nid, $vid, $status = array(SIMPLENEWS_SPOOL_PENDING, SIMPLENEWS_SPOOL_IN_PROGRESS)) { + $clauses = array(); + $params = array(); + + if (!is_array($status)) { + $status = array($status); + } + + foreach ($status as $s) { + $clauses[] = 's.status = %d'; + $params[] = $s; + } + $params[] = $nid; + $params[] = $vid; + + $query = "SELECT COUNT(nid) + FROM {simplenews_mail_spool} s + WHERE (" . implode(' OR ', $clauses) . ") + AND nid = %d + AND vid = %d"; + + return db_result(db_query($query, $params)); } /** diff --git a/simplenews.module b/simplenews.module index ad3d4e3..0493e27 100644 --- a/simplenews.module +++ b/simplenews.module @@ -50,6 +50,7 @@ define('SIMPLENEWS_STATUS_SEND_READY', 2); define('SIMPLENEWS_SPOOL_HOLD', 0); define('SIMPLENEWS_SPOOL_PENDING', 1); define('SIMPLENEWS_SPOOL_DONE', 2); +define('SIMPLENEWS_SPOOL_IN_PROGRESS', 3); /** * AFTER EACH 100 NEWSLETTERS