Index: modules/system/system.install
===================================================================
--- modules/system/system.install	(revision 426)
+++ modules/system/system.install	(working copy)
@@ -3534,6 +3534,56 @@
 }
 
 /**
+ * Add Drupal 6.x search tables/updates.
+ */
+function system_update_1023() {
+  $ret = array();
+  switch ($GLOBALS['db_type']) {
+    case 'mysql':
+    case 'mysqli':
+      // Create the search_dataset.reindex column.
+      $ret[] = update_sql("ALTER TABLE {search_dataset} ADD reindex int unsigned NOT NULL default '0'");
+
+      // Drop the search_index.from fields which are no longer used.
+      $ret[] = update_sql("ALTER TABLE {search_index} DROP INDEX from_sid_type");
+      $ret[] = update_sql("ALTER TABLE {search_index} DROP fromsid");
+      $ret[] = update_sql("ALTER TABLE {search_index} DROP fromtype");
+
+      // Drop the search_dataset.sid_type index, so that it can be made unique.
+      $ret[] = update_sql("ALTER TABLE {search_dataset} DROP INDEX sid_type");
+
+      // Create the search_node_links Table.
+      $ret = update_sql("CREATE TABLE {search_node_links} (
+        sid int unsigned NOT NULL default '0',
+        type varchar(16) default NULL,
+        nid int unsigned NOT NULL default '0',
+        caption TEXT NOT NULL,
+        PRIMARY KEY sid_type_nid (sid, type, nid),
+        KEY nid (nid)
+      ) /*!40100 DEFAULT CHARACTER SET UTF8 */ ");
+
+      // with the change to search_dataset.reindex, the search queue is handled differently,
+      // and this is no longer needed
+      variable_del('node_cron_last');
+
+      // Everything needs to be reindexed.
+      $ret[] = update_sql("UPDATE {search_dataset} SET reindex = 1");
+
+      // Add a unique index for the search_index.
+      // Since it's possible that some existing sites have duplicates,
+      // create the index using the IGNORE keyword, which ignores duplicate errors.
+      // However, pgsql doesn't support it
+      $ret[] = update_sql("ALTER IGNORE TABLE {search_index} ADD UNIQUE KEY sid_word_type (sid, word, type)");
+      $ret[] = update_sql("ALTER IGNORE TABLE {search_dataset} ADD UNIQUE KEY sid_type (sid, type)");
+      break;
+    case 'pgsql':
+      // @TODO:
+      break;
+  }
+  return $ret;
+}
+
+/**
  * @} End of "defgroup updates-5.x-extra"
  */
 
Index: modules/node/node.module
===================================================================
--- modules/node/node.module	(revision 426)
+++ modules/node/node.module	(working copy)
@@ -815,15 +815,12 @@
       return t('Content');
 
     case 'reset':
-      variable_del('node_cron_last');
-      variable_del('node_cron_last_nid');
+      db_query("UPDATE {search_dataset} SET reindex = %d AND type = 'node'", time());
       return;
 
     case 'status':
-      $last = variable_get('node_cron_last', 0);
-      $last_nid = variable_get('node_cron_last_nid', 0);
       $total = db_result(db_query('SELECT COUNT(*) FROM {node} WHERE status = 1'));
-      $remaining = db_result(db_query('SELECT COUNT(*) FROM {node} n LEFT JOIN {node_comment_statistics} c ON n.nid = c.nid WHERE n.status = 1 AND ((GREATEST(n.created, n.changed, c.last_comment_timestamp) = %d AND n.nid > %d ) OR (n.created > %d OR n.changed > %d OR c.last_comment_timestamp > %d))', $last, $last_nid, $last, $last, $last));
+      $remaining = db_result(db_query("SELECT COUNT(*) FROM {node} n LEFT JOIN {search_dataset} d ON d.type = 'node' AND d.sid = n.nid WHERE d.sid IS NULL OR d.reindex <> 0"));
       return array('remaining' => $remaining, 'total' => $total);
 
     case 'admin':
@@ -895,7 +892,7 @@
         $ranking[] = '%d * POW(2, (GREATEST(n.created, n.changed, c.last_comment_timestamp) - %d) * 6.43e-8)';
         $arguments2[] = $weight;
         $arguments2[] = (int)variable_get('node_cron_last', 0);
-        $join2 .= ' INNER JOIN {node} n ON n.nid = i.sid LEFT JOIN {node_comment_statistics} c ON c.nid = i.sid';
+        $join2 .= ' LEFT JOIN {node_comment_statistics} c ON c.nid = i.sid';
         $stats_join = TRUE;
         $total += $weight;
       }
@@ -2488,34 +2485,16 @@
 }
 
 /**
- * shutdown function to make sure we always mark the last node processed.
- */
-function node_update_shutdown() {
-  global $last_change, $last_nid;
-
-  if ($last_change && $last_nid) {
-    variable_set('node_cron_last', $last_change);
-    variable_set('node_cron_last_nid', $last_nid);
-  }
-}
-
-/**
  * Implementation of hook_update_index().
  */
 function node_update_index() {
-  global $last_change, $last_nid;
-
-  register_shutdown_function('node_update_shutdown');
-
-  $last = variable_get('node_cron_last', 0);
-  $last_nid = variable_get('node_cron_last_nid', 0);
   $limit = (int)variable_get('search_cron_limit', 100);
 
   // Store the maximum possible comments per thread (used for ranking by reply count)
   variable_set('node_cron_comments_scale', 1.0 / max(1, db_result(db_query('SELECT MAX(comment_count) FROM {node_comment_statistics}'))));
   variable_set('node_cron_views_scale', 1.0 / max(1, db_result(db_query('SELECT MAX(totalcount) FROM {node_counter}'))));
 
-  $result = db_query_range('SELECT GREATEST(IF(c.last_comment_timestamp IS NULL, 0, c.last_comment_timestamp), n.changed) as last_change, n.nid FROM {node} n LEFT JOIN {node_comment_statistics} c ON n.nid = c.nid WHERE n.status = 1 AND ((GREATEST(n.changed, c.last_comment_timestamp) = %d AND n.nid > %d) OR (n.changed > %d OR c.last_comment_timestamp > %d)) ORDER BY GREATEST(n.changed, c.last_comment_timestamp) ASC, n.nid ASC', $last, $last_nid, $last, $last, $last, 0, $limit);
+  $result = db_query_range("SELECT n.nid FROM {node} n LEFT JOIN {search_dataset} d ON d.type = 'node' AND d.sid = n.nid WHERE d.sid IS NULL OR d.reindex <> 0 ORDER BY d.reindex ASC, n.nid ASC", 0, $limit);
 
   while ($node = db_fetch_object($result)) {
     $last_change = $node->last_change;
Index: modules/search/search.module
===================================================================
--- modules/search/search.module	(revision 426)
+++ modules/search/search.module	(working copy)
@@ -268,9 +268,11 @@
   }
   else {
     db_query("DELETE FROM {search_dataset} WHERE sid = %d AND type = '%s'", $sid, $type);
-    db_query("DELETE FROM {search_index} WHERE fromsid = %d AND fromtype = '%s'", $sid, $type);
-    // When re-indexing, keep link references
-    db_query("DELETE FROM {search_index} WHERE sid = %d AND type = '%s'". ($reindex ? " AND fromsid = 0" : ''), $sid, $type);
+    db_query("DELETE FROM {search_index} WHERE sid = %d AND type = '%s'", $sid, $type);
+    // Don't remove links if re-indexing.
+    if (!$reindex) {
+      db_query("DELETE FROM {search_node_links} WHERE sid = %d AND type = '%s'", $sid, $type);
+    }
   }
 }
 
@@ -566,18 +568,24 @@
               $word = (int)ltrim($word, '-0');
             }
 
+            // Links score mainly for the target.
             if ($link) {
               if (!isset($results[$linknid])) {
                 $results[$linknid] = array();
               }
-              $results[$linknid][$word] += $score * $focus;
+              $results[$linknid][] = $word;
+              // Reduce score of the link caption in the source.
+              $focus *= 0.2;
             }
-            else {
-              $results[0][$word] += $score * $focus;
-              // Focus is a decaying value in terms of the amount of unique words up to this point.
-              // From 100 words and more, it decays, to e.g. 0.5 at 500 words and 0.3 at 1000 words.
-              $focus = min(1, .01 + 3.5 / (2 + count($results[0]) * .015));
+            // Fall-through
+            if (!isset($results[0][$word])) {
+              $results[0][$word] = 0;
             }
+            $results[0][$word] += $score * $focus;
+
+            // Focus is a decaying value in terms of the amount of unique words up to this point.
+            // From 100 words and more, it decays, to e.g. 0.5 at 500 words and 0.3 at 1000 words.
+            $focus = min(1, .01 + 3.5 / (2 + count($results[0]) * .015));
           }
           $tagwords++;
           // Too many words inside a single tag probably mean a tag was accidentally left open.
@@ -594,7 +602,7 @@
   search_wipe($sid, $type, TRUE);
 
   // Insert cleaned up data into dataset
-  db_query("INSERT INTO {search_dataset} (sid, type, data) VALUES (%d, '%s', '%s')", $sid, $type, $accum);
+  db_query("INSERT INTO {search_dataset} (sid, type, data, reindex) VALUES (%d, '%s', '%s', %d)", $sid, $type, $accum, 0);
 
   // Insert results into search index
   foreach ($results[0] as $word => $score) {
@@ -603,16 +611,86 @@
   }
   unset($results[0]);
 
-  // Now insert links to nodes
+  // Get all previous links from this item.
+  $result = db_query("SELECT nid, caption FROM {search_node_links} WHERE sid = %d AND type = '%s'", $sid, $type);
+  $links = array();
+  while ($link = db_fetch_object($result)) {
+    $links[$link->nid] = $link->caption;
+  }
+
+  // Now store links to nodes.
   foreach ($results as $nid => $words) {
-    foreach ($words as $word => $score) {
-      db_query("INSERT INTO {search_index} (word, sid, type, fromsid, fromtype, score) VALUES ('%s', %d, '%s', %d, '%s', %f)", $word, $nid, 'node', $sid, $type, $score);
-      search_dirty($word);
+    $caption = implode(' ', $words);
+    if (isset($links[$nid])) {
+      if ($links[$nid] != $caption) {
+        // Update the existing link and mark the node for reindexing.
+        db_query("UPDATE {search_node_links} SET caption = '%s' WHERE sid = %d AND type = '%s' AND nid = %d", $caption, $sid, $type, $nid);
+        search_touch_node($nid);
+      }
+      // Unset the link to mark it as processed.
+      unset($links[$nid]);
     }
+    else {
+      // Insert the existing link and mark the node for reindexing.
+      db_query("INSERT INTO {search_node_links} (caption, sid, type, nid) VALUES ('%s', %d, '%s', %d)", $caption, $sid, $type, $nid);
+      search_touch_node($nid);
+    }
   }
+  // Any left-over links in $links no longer exist. Delete them and mark the nodes for reindexing.
+  foreach ($links as $nid) {
+    db_query("DELETE FROM {search_node_links} WHERE sid = %d AND type = '%s' AND nid = %d", $sid, $type, $nid);
+    search_touch_node($nid);
+  }
 }
 
 /**
+ * Change a node's changed timestamp to 'now' to force reindexing.
+ *
+ * @param $nid
+ *   The nid of the node that needs reindexing.
+ */
+function search_touch_node($nid) {
+  db_query("UPDATE {search_dataset} SET reindex = %d WHERE sid = %d AND type = 'node'", time(), $nid);
+}
+
+/**
+ * Implementation of hook_nodeapi().
+ */
+function search_nodeapi(&$node, $op, $teaser = NULL, $page = NULL) {
+  switch ($op) {
+    // Transplant links to a node into the target node.
+    case 'update index':
+      $result = db_query("SELECT caption FROM {search_node_links} WHERE nid = %d", $node->nid);
+      $output = array();
+      while ($link = db_fetch_object($result)) {
+        $output[] = $link->caption;
+      }
+      return '<a>('. implode(', ', $output) .')</a>';
+    // Reindex the node when it is updated.  The node is automatically indexed
+    // when it is added, simply by being added to the node table.
+    case 'update':
+      search_touch_node($node->nid);
+      break;
+  }
+}
+
+/**
+ * Implementation of hook_comment().
+ */
+function search_comment($a1, $op) {
+  switch ($op) {
+    // Reindex the node when comments are added or changed
+    case 'insert':
+    case 'update':
+    case 'delete':
+    case 'publish':
+    case 'unpublish':
+      search_touch_node(is_array($a1) ? $a1['nid'] : $a1->nid);
+      break;
+  }
+}
+
+/**
  * Extract a module-specific search option from a search query. e.g. 'type:book'
  */
 function search_query_extract($keys, $option) {
@@ -638,7 +716,22 @@
 /**
  * Parse a search query into SQL conditions.
  *
- * We build a query that matches the dataset bodies.
+ * We build two queries that matches the dataset bodies. @See do_search for
+ * more about these.
+ *
+ * @param $text
+ *   The search keys.
+ * @return
+ *   A list of six elements.
+ *    * A series of statements AND'd together which will be used to provide all
+ *      possible matches.
+ *    * Arguments for this query part.
+ *    * A series of exact word matches OR'd together.
+ *    * Arguments for this query part.
+ *    * A bool indicating whether this is a simple query or not. Negative
+ *      terms, presence of both AND / OR make this FALSE.
+ *    * A bool indicating the presence of a lowercase or. Maybe the user
+ *      wanted to use OR.
  */
 function search_parse_query($text) {
   $keys = array('positive' => array(), 'negative' => array());
@@ -652,12 +745,15 @@
 
   // Classify tokens
   $or = FALSE;
+  $or_warning = FALSE;
+  $simple = TRUE;
   foreach ($matches as $match) {
     $phrase = FALSE;
     // Strip off phrase quotes
     if ($match[2]{0} == '"') {
       $match[2] = substr($match[2], 1, -1);
       $phrase = TRUE;
+      $simple = FALSE;
     }
     // Simplify keyword according to indexing rules and external preprocessors
     $words = search_simplify($match[2]);
@@ -681,6 +777,9 @@
     }
     // Plain keyword
     else {
+      if ($match[2] == 'or') {
+        $or_warning = TRUE;
+      }
       if ($or) {
         // Add to last element (which is an array)
         $keys['positive'][count($keys['positive']) - 1] = array_merge($keys['positive'][count($keys['positive']) - 1], $words);
@@ -698,10 +797,13 @@
   $arguments = array();
   $arguments2 = array();
   $matches = 0;
+  $simple_and = FALSE;
+  $simple_or = FALSE;
   // Positive matches
   foreach ($keys['positive'] as $key) {
     // Group of ORed terms
     if (is_array($key) && count($key)) {
+      $simple_or = TRUE;
       $queryor = array();
       $any = FALSE;
       foreach ($key as $or) {
@@ -720,6 +822,7 @@
     }
     // Single ANDed term
     else {
+      $simple_and = TRUE;
       list($q, $count) = _search_parse_query($key, $arguments2);
       if ($q) {
         $query[] = $q;
@@ -729,12 +832,16 @@
       }
     }
   }
+  if ($simple_and && $simple_or) {
+    $simple = FALSE;
+  }
   // Negative matches
   foreach ($keys['negative'] as $key) {
     list($q) = _search_parse_query($key, $arguments2, TRUE);
     if ($q) {
       $query[] = $q;
       $arguments[] = $key;
+      $simple = FALSE;
     }
   }
   $query = implode(' AND ', $query);
@@ -742,7 +849,7 @@
   // Build word-index conditions for the first pass
   $query2 = substr(str_repeat("i.word = '%s' OR ", count($arguments2)), 0, -4);
 
-  return array($query, $arguments, $query2, $arguments2, $matches);
+  return array($query, $arguments, $query2, $arguments2, $matches, $simple, $or_warning);
 }
 
 /**
@@ -774,29 +881,13 @@
  * This function is normally only called by each module that support the
  * indexed search (and thus, implements hook_update_index()).
  *
- * Two queries are performed which can be extended by the caller.
+ * Results are retrieved in two logical passes. However, the two passes are
+ * joined together into a single query.  And in the case of most simple
+ * queries the second pass is not even used.
  *
- * The first query selects a set of possible matches based on the search index
- * and any extra given restrictions. This is the classic "OR" search.
+ * The first pass selects a set of all possible matches, which has the benefit
+ * of also providing the exact result set for simple "AND" or "OR" searches.
  *
- * SELECT i.type, i.sid, SUM(i.score*t.count) AS relevance
- * FROM {search_index} i
- * INNER JOIN {search_total} t ON i.word = t.word
- * $join1
- * WHERE $where1 AND (...)
- * GROUP BY i.type, i.sid
- *
- * The second query further refines this set by verifying advanced text
- * conditions (such as AND, negative or phrase matches), and orders the results
- * on a the column or expression 'score':
- *
- * SELECT i.type, i.sid, $select2
- * FROM temp_search_sids i
- * INNER JOIN {search_dataset} d ON i.sid = d.sid AND i.type = d.type
- * $join2
- * WHERE (...)
- * ORDER BY score DESC
- *
  * @param $keywords
  *   A search string as entered by the user.
  *
@@ -814,7 +905,7 @@
  * @param $arguments1
  *   (optional) Extra SQL arguments belonging to the first query.
  *
- * @param $select2
+ * @param $columns2
  *   (optional) Inserted into the SELECT pat of the second query. Must contain
  *   a column selected as 'score'.
  *   defaults to 'i.relevance AS score'
@@ -835,40 +926,45 @@
  *
  * @ingroup search
  */
-function do_search($keywords, $type, $join1 = '', $where1 = '1', $arguments1 = array(), $select2 = 'i.relevance AS score', $join2 = '', $arguments2 = array(), $sort_parameters = 'ORDER BY score DESC') {
+function do_search($keywords, $type, $join1 = '', $where1 = '1', $arguments1 = array(), $columns2 = 'i.relevance AS score', $join2 = '', $arguments2 = array(), $sort_parameters = 'ORDER BY score DESC') {
   $query = search_parse_query($keywords);
 
   if ($query[2] == '') {
     form_set_error('keys', t('You must include at least one positive keyword with @count characters or more.', array('@count' => variable_get('minimum_word_size', 3))));
   }
+  if ($query[6]) {
+    form_set_error('keys', t('Try uppercase "OR" to search for either of two terms.'));
+  }
   if ($query === NULL || $query[0] == '' || $query[2] == '') {
     return array();
   }
 
-  // First pass: select all possible matching sids, doing a simple index-based OR matching on the keywords.
-  // 'matches' is used to reject those items that cannot possibly match the query.
-  $conditions = $where1 .' AND ('. $query[2] .") AND i.type = '%s'";
-  $arguments = array_merge($arguments1, $query[3], array($type, $query[4]));
-  $result = db_query_temporary("SELECT i.type, i.sid, SUM(i.score * t.count) AS relevance, COUNT(*) AS matches FROM {search_index} i INNER JOIN {search_total} t ON i.word = t.word $join1 WHERE $conditions GROUP BY i.type, i.sid HAVING COUNT(*) >= %d", $arguments, 'temp_search_sids');
+  // Build query for keyword normalization.
+  $conditions = "$where1 AND ($query[2]) AND i.type = '%s'";
+  $arguments1 = array_merge($arguments1, $query[3], array($type));
+  $join = "INNER JOIN {search_total} t ON i.word = t.word $join1";
+  if (!$query[5]) {
+    $conditions .= " AND ($query[0])";
+    $arguments1 = array_merge($arguments1, $query[1]);
+    $join .= " INNER JOIN {search_dataset} d ON i.sid = d.sid AND i.type = d.type";
+  }
 
-  // Calculate maximum relevance, to normalize it
-  $normalize = db_result(db_query('SELECT MAX(relevance) FROM temp_search_sids'));
+  // Calculate maximum keyword relevance, to normalize it.
+  $select = "SELECT MAX(i.score * t.count) FROM {search_index} i $join WHERE $conditions GROUP BY i.type, i.sid HAVING COUNT(*) >= %d";
+  $arguments = array_merge($arguments1, array($query[4]));
+  $normalize = db_result(db_query($select, $arguments));
   if (!$normalize) {
     return array();
   }
-  $select2 = str_replace('i.relevance', '('. (1.0 / $normalize) .' * i.relevance)', $select2);
+  $columns2 = str_replace('i.relevance', '('. (1.0 / $normalize) .' * SUM(i.score * t.count))', $columns2);
 
-  // Second pass: only keep items that match the complicated keywords conditions (phrase search, negative keywords, ...)
-  $conditions = '('. $query[0] .')';
-  $arguments = array_merge($arguments2, $query[1]);
-  $result = db_query_temporary("SELECT i.type, i.sid, $select2 FROM temp_search_sids i INNER JOIN {search_dataset} d ON i.sid = d.sid AND i.type = d.type $join2 WHERE $conditions $sort_parameters", $arguments, 'temp_search_results');
-  if (($count = db_result(db_query('SELECT COUNT(*) FROM temp_search_results'))) == 0) {
-    return array();
-  }
-  $count_query = "SELECT $count";
+  // Build query to retrieve results.
+  $select = "SELECT i.type, i.sid, $columns2 FROM {search_index} i $join $join2 WHERE $conditions GROUP BY i.type, i.sid HAVING COUNT(*) >= %d";
+  $count_select =  "SELECT COUNT(*) FROM ($select) n1";
+  $arguments = array_merge($arguments2, $arguments1, array($query[4]));
 
   // Do actual search query
-  $result = pager_query("SELECT * FROM temp_search_results", 10, 0, $count_query);
+  $result = pager_query("$select $sort_parameters", 10, 0, $count_select, $arguments);
   $results = array();
   while ($item = db_fetch_object($result)) {
     $results[] = $item;
Index: modules/search/search.install
===================================================================
--- modules/search/search.install	(revision 426)
+++ modules/search/search.install	(working copy)
@@ -12,6 +12,7 @@
         sid int unsigned NOT NULL default '0',
         type varchar(16) default NULL,
         data longtext NOT NULL,
+        reindex int unsigned NOT NULL default '0',
         KEY sid_type (sid, type)
       ) /*!40100 DEFAULT CHARACTER SET UTF8 */ ");
 
@@ -19,11 +20,8 @@
         word varchar(50) NOT NULL default '',
         sid int unsigned NOT NULL default '0',
         type varchar(16) default NULL,
-        fromsid int unsigned NOT NULL default '0',
-        fromtype varchar(16) default NULL,
         score float default NULL,
         KEY sid_type (sid, type),
-        KEY from_sid_type (fromsid, fromtype),
         KEY word (word)
       ) /*!40100 DEFAULT CHARACTER SET UTF8 */ ");
 
@@ -32,8 +30,18 @@
         count float default NULL,
         PRIMARY KEY (word)
       ) /*!40100 DEFAULT CHARACTER SET UTF8 */ ");
+
+      db_query("CREATE TABLE {search_node_links} (
+        sid int unsigned NOT NULL default '0',
+        type varchar(16) default NULL,
+        nid int unsigned NOT NULL default '0',
+        caption TEXT NOT NULL,
+        PRIMARY KEY sid_type_nid (sid, type, nid),
+        KEY nid (nid)
+      ) /*!40100 DEFAULT CHARACTER SET UTF8 */ ");
       break;
     case 'pgsql':
+      // @TODO: update for #146466
       db_query("CREATE TABLE {search_dataset} (
         sid int_unsigned NOT NULL default '0',
         type varchar(16) default NULL,
