From 8af572dc8fc04aa963ee79481718b8c06c6512f0 Mon Sep 17 00:00:00 2001
From: Mark Sonnabaum <mark@sonnabaum.com>
Date: Sun, 6 Mar 2011 13:22:31 -0600
Subject: [PATCH] Changed remote commands to execute in parallel across multiple site aliases.

---
 includes/backend.inc |  117 ++++++++++++++++++++++++++++++--------------------
 includes/drush.inc   |   18 ++++---
 2 files changed, 80 insertions(+), 55 deletions(-)

diff --git includes/backend.inc includes/backend.inc
index e0c2059..62e2390 100644
--- includes/backend.inc
+++ includes/backend.inc
@@ -226,42 +226,60 @@ function _drush_backend_integrate($data) {
  */
 function _drush_proc_open($cmd, $data = NULL, $context = NULL) {
   $descriptorspec = array(
-     0 => array("pipe", "r"),  // stdin is a pipe that the child will read from
-     1 => array("pipe", "w"),  // stdout is a pipe that the child will write to
-     2 => array("pipe", "w")   // stderr is a pipe the child will write to
+    0 => array("pipe", "r"),  // stdin is a pipe that the child will read from
+    1 => array("pipe", "w"),  // stdout is a pipe that the child will write to
+    2 => array("pipe", "w")   // stderr is a pipe the child will write to
   );
-  $process = proc_open($cmd, $descriptorspec, $pipes, null, null, array('context' => $context));
-  if (is_resource($process)) {
-    if ($data) {
-      fwrite($pipes[0], json_encode($data)); // pass the data array in a JSON encoded string
-    }
 
-    $info = stream_get_meta_data($pipes[1]);
-    stream_set_blocking($pipes[1], TRUE);
-    stream_set_timeout($pipes[1], 1);
-    $string = '';
-    while (!feof($pipes[1]) && !$info['timed_out']) {
-      $string .= fgets($pipes[1], 4096);
-      $info = stream_get_meta_data($pipes[1]);
-      flush();
-    };
-
-    $info = stream_get_meta_data($pipes[2]);
-    stream_set_blocking($pipes[2], TRUE);
-    stream_set_timeout($pipes[2], 1);
-    while (!feof($pipes[2]) && !$info['timed_out']) {
-      $string .= fgets($pipes[2], 4096);
-      $info = stream_get_meta_data($pipes[2]);
-      flush();
-    };
-
-    fclose($pipes[0]);
-    fclose($pipes[1]);
-    fclose($pipes[2]);
-    $code = proc_close($process);
-    return array('cmd' => $cmd, 'output' => $string, 'code' => $code);
+  $open_processes = array();
+  $bucket = array();
+  foreach ($cmd as $site=> $c) {
+    $process = array();
+    $process['process'] = proc_open($c, $descriptorspec, $process['pipes'], null, null, array('context' => $context));
+
+    if (is_resource($process['process'])) {
+      if ($data) {
+        fwrite($process['pipes'][0], json_encode($data)); // pass the data array in a JSON encoded string
+      }
+
+      $process['info'] = stream_get_meta_data($process['pipes'][1]);
+      stream_set_blocking($process['pipes'][1], TRUE);
+      stream_set_timeout($process['pipes'][1], 1);
+      $bucket[$site]['cmd'] = $c;
+      $open_processes[$site] = $process;
+    }
   }
-  return FALSE;
+  while (sizeof($open_processes)) {
+    foreach ($open_processes as $site => &$current_process) {
+      if (isset($current_process['pipes'][1]) || isset($current_process['pipes'][2])) {
+        foreach (array(1, 2) as $pipe) {
+          if (isset($current_process['pipes'][$pipe])) {
+            $info = stream_get_meta_data($current_process['pipes'][$pipe]);
+            if (!feof($current_process['pipes'][$pipe]) && !$info['timed_out']) {
+              $output = fgets($current_process['pipes'][$pipe], 4096);
+              $bucket[$site][$pipe] .= $output;
+              $bucket[$site]['output'] .= $output;
+              flush();
+            }
+            else {
+              fclose($current_process['pipes'][$pipe]);
+              unset($current_process['pipes'][$pipe]);
+              // close the pipe , set a marker
+            }
+          }
+        }
+      }
+      else {
+        // if both pipes are closed for the process, remove it from active loop and add a new process to open.
+        $bucket[$site]['code'] = proc_close($current_process['process']);
+        unset($open_processes[$site]);
+      }
+    }
+  }
+  return $bucket;
+  // TODO: Handle bad proc handles
+  //}
+  //return FALSE;
 }
 
 /**
@@ -379,9 +397,12 @@ function drush_backend_invoke_args($command, $args, $data = array(), $method = '
  * @return
  *   A text string representing a fully escaped command.
  */
-function drush_backend_invoke_sitealias($site_record, $command, $args, $data = array(), $method = 'GET', $integrate = TRUE) {
-  $cmd = _drush_backend_generate_command_sitealias($site_record, $command, $args, $data, $method);
-  return _drush_backend_invoke($cmd, $data, array_key_exists('#integrate', $data) ? $data['#integrate'] : $integrate);
+function drush_backend_invoke_sitealias($site_records, $command, $args, $data = array(), $method = 'GET', $integrate = TRUE) {
+  $cmds = array();
+  foreach ($site_records as $site_name => $site_record) {
+    $cmds[$site_name] = _drush_backend_generate_command_sitealias($site_record, $command, $args, $data[$site_name], $method);
+  }
+  return _drush_backend_invoke($cmds, $data, array_key_exists('#integrate', $data) ? $data['#integrate'] : $integrate);
 }
 
 /**
@@ -405,7 +426,7 @@ function drush_backend_invoke_sitealias($site_record, $command, $args, $data = a
  *   If the command could not be completed successfully, FALSE.
  *   If the command was completed, this will return an associative array containing the data from drush_backend_output().
  */
-function _drush_backend_invoke($cmd, $data = null, $integrate = TRUE) {
+function _drush_backend_invoke($cmds, $data = null, $integrate = TRUE) {
   if (drush_get_context('DRUSH_SIMULATE') && !array_key_exists('#override-simulated', $data)) {
     drush_print(dt('Simulating backend invoke: !cmd', array('!cmd' => $cmd)));
     return FALSE;
@@ -416,21 +437,23 @@ function _drush_backend_invoke($cmd, $data = null, $integrate = TRUE) {
     drush_op_system($cmd);
   }
   else {
-    $proc = _drush_proc_open($cmd, $data);
-
-    if (($proc['code'] == DRUSH_APPLICATION_ERROR) && $integrate) {
-      drush_set_error('DRUSH_APPLICATION_ERROR', dt("The external command could not be executed due to an application error."));
-    }
-
-    if ($proc['output']) {
-      $values = drush_backend_parse_output($proc['output'], $integrate);
-      if (is_array($values)) {
-        return $values;
+    $procs = _drush_proc_open($cmds, $data);
+    $return = array();
+    foreach ($procs as $site => $proc) {
+      if (($proc['code'] == DRUSH_APPLICATION_ERROR) && $integrate) {
+        drush_set_error('DRUSH_APPLICATION_ERROR', dt("The external command could not be executed due to an application error."));
       }
-      else {
-        return drush_set_error('DRUSH_FRAMEWORK_ERROR', dt("The command could not be executed successfully (returned: !return, code: %code)", array("!return" => $proc['output'], "%code" =>  $proc['code'])));
+      if ($proc['output']) {
+        $values = drush_backend_parse_output($proc['output'], $integrate);
+        if (is_array($values)) {
+          $return[$site] = $values;
+        }
+        else {
+          $return[$site] = drush_set_error('DRUSH_FRAMEWORK_ERROR', dt("The command could not be executed successfully (returned: !return, code: %code)", array("!return" => $proc['output'], "%code" =>  $proc['code'])));
+        }
       }
     }
+    return $return;
   }
   return FALSE;
 }
diff --git includes/drush.inc includes/drush.inc
index 344a6da..685d524 100644
--- includes/drush.inc
+++ includes/drush.inc
@@ -799,8 +799,8 @@ function drush_remote_command() {
         }
       }
       $multi_options['reserve-margin'] = $max_name_length + strlen($label_separator);
-      foreach ($site_list as $alias_name => $alias_record) {
-        $values = drush_do_site_command($alias_record, $command, $args, $multi_options);
+      $site_values = drush_do_site_command($site_list, $command, $args, $multi_options);
+      foreach ($site_values as $alias_name => $values) {
         foreach (explode("\n", $values['output']) as $line) {
           if (empty($line)) {
             drush_print();
@@ -890,15 +890,17 @@ function drush_do_multiple_command($command, $source_record, $destination_record
  * drush_invoke_sitealias_args.  Please call the standard function
  * unless you need to set $integrate = TRUE.
  */
-function drush_do_site_command($site_record, $command, $args = array(), $data = array(), $integrate = FALSE) {
+function drush_do_site_command($site_records, $command, $args = array(), $data = array(), $integrate = FALSE) {
   $values = NULL;
-  if (!empty($site_record)) {
-    foreach ($site_record as $key => $value) {
-      if (!isset($data[$key]) && !in_array($key, drush_sitealias_site_selection_keys())) {
-        $data[$key] = $site_record[$key];
+  if (!empty($site_records)) {
+    foreach ($site_records as $site_name => $site_record) {
+      foreach ($site_record as $key => $value) {
+        if (!isset($data[$site_name][$key]) && !in_array($key, drush_sitealias_site_selection_keys())) {
+          $data[$site_name][$key] = $site_record[$key];
+        }
       }
     }
-    $values = drush_backend_invoke_sitealias($site_record, $command, $args, $data, 'GET', $integrate);
+    $values = drush_backend_invoke_sitealias($site_records, $command, $args, $data, 'GET', $integrate);
   }
   return $values;
 }
-- 
1.7.4.1

