1
0
Fork 0
mirror of https://github.com/capistrano/capistrano synced 2023-03-27 23:21:18 -04:00

Add parallel() helper for executing multiple different commands in parallel

This commit is contained in:
Jamis Buck 2008-08-20 15:00:31 -06:00
parent d5b2042329
commit c70fcd595e
3 changed files with 116 additions and 19 deletions

View file

@ -1,5 +1,7 @@
== (unreleased)
* Added parallel() helper for executing multiple different commands in parallel [Jamis Buck]
* Make sure a task only uses the last on_rollback block, once, on rollback [Jamis Buck]
* Add :shared_children variable to customize which subdirectories are created by deploy:setup [Jonathan Share]

View file

@ -8,10 +8,74 @@ module Capistrano
class Command
include Processable
attr_reader :command, :sessions, :options
class Tree
attr_reader :branches
def self.process(command, sessions, options={}, &block)
new(command, sessions, options, &block).process!
class Branch
attr_accessor :command, :callback
def initialize(command, callback)
@command = command.strip.gsub(/\r?\n/, "\\\n")
@callback = callback || Capistrano::Configuration.default_io_proc
@skip = false
end
def skip?
@skip
end
def skip!
@skip = true
end
def match(server)
true
end
def to_s
command.inspect
end
end
class PatternBranch < Branch
attr_accessor :pattern
def initialize(pattern, command, callback)
@pattern = pattern
super(command, callback)
end
def match(server)
pattern === server.host
end
def to_s
"#{pattern.inspect} :: #{command.inspect}"
end
end
def initialize
@branches = []
yield self if block_given?
end
def if(pattern, command, &block)
branches << PatternBranch.new(pattern, command, block)
end
def else(command, &block)
branches << Branch.new(command, block)
end
def branch_for(server)
branches.detect { |branch| branch.match(server) }
end
end
attr_reader :tree, :sessions, :options
def self.process(tree, sessions, options={})
new(tree, sessions, options).process!
end
# Instantiates a new command object. The +command+ must be a string
@ -23,11 +87,10 @@ module Capistrano
# * +data+: (optional), a string to be sent to the command via it's stdin
# * +env+: (optional), a string or hash to be interpreted as environment
# variables that should be defined for this command invocation.
def initialize(command, sessions, options={}, &block)
@command = command.strip.gsub(/\r?\n/, "\\\n")
def initialize(tree, sessions, options={})
@tree = tree
@sessions = sessions
@options = options
@callback = block
@channels = open_channels
end
@ -67,17 +130,20 @@ module Capistrano
def open_channels
sessions.map do |session|
session.open_channel do |channel|
server = session.xserver
server = session.xserver
branch = tree.branch_for(server)
next if branch.skip?
session.open_channel do |channel|
channel[:server] = server
channel[:host] = server.host
channel[:options] = options
channel[:branch] = branch
request_pty_if_necessary(channel) do |ch, success|
if success
logger.trace "executing command", ch[:server] if logger
cmd = replace_placeholders(command, ch)
cmd = replace_placeholders(channel[:branch].command, ch)
if options[:shell] == false
shell = nil
@ -101,11 +167,11 @@ module Capistrano
end
channel.on_data do |ch, data|
@callback[ch, :out, data] if @callback
ch[:branch].callback[ch, :out, data]
end
channel.on_extended_data do |ch, type, data|
@callback[ch, :err, data] if @callback
ch[:branch].callback[ch, :err, data]
end
channel.on_request("exit-status") do |ch, data|
@ -116,7 +182,7 @@ module Capistrano
ch[:closed] = true
end
end
end
end.compact
end
def request_pty_if_necessary(channel)

View file

@ -26,6 +26,12 @@ module Capistrano
set :default_run_options, {}
end
def parallel(options={})
raise ArgumentError, "parallel() requires a block" unless block_given?
tree = Command::Tree.new { |t| yield t }
run_tree(tree)
end
# Invokes the given command. If a +via+ key is given, it will be used
# to determine what method to use to invoke the command. It defaults
# to :run, but may be :sudo, or any other method that conforms to the
@ -44,19 +50,33 @@ module Capistrano
# stdout), and the data that was received.
def run(cmd, options={}, &block)
block ||= self.class.default_io_proc
logger.debug "executing #{cmd.strip.inspect}"
tree = Command::Tree.new { |t| t.else(cmd, block) }
run_tree(tree, options)
end
return if dry_run || (debug && continue_execution(cmd) == false)
def run_tree(tree, options={})
if tree.branches.length == 1
logger.debug "executing #{tree.branches.first}"
else
logger.debug "executing multiple commands in parallel"
tree.branches.each do |branch|
logger.trace "-> #{branch}"
end
end
return if dry_run || (debug && continue_execution(tree) == false)
options = add_default_command_options(options)
if cmd.include?(sudo)
block = sudo_behavior_callback(block)
tree.branches.each do |branch|
if branch.command.include?(sudo)
branch.callback = sudo_behavior_callback(branch.callback)
end
end
execute_on_servers(options) do |servers|
targets = servers.map { |s| sessions[s] }
Command.process(cmd, targets, options.merge(:logger => logger), &block)
Command.process(tree, targets, options.merge(:logger => logger))
end
end
@ -145,8 +165,17 @@ module Capistrano
fetch(:sudo_prompt, "sudo password: ")
end
def continue_execution(cmd)
case Capistrano::CLI.debug_prompt(cmd)
def continue_execution(tree)
if tree.branches.length == 1
continue_execution_for_branch(tree.branches.first)
else
tree.branches.each { |branch| branch.skip! unless continue_execution_for_branch(branch) }
tree.branches.any? { |branch| !branch.skip? }
end
end
def continue_execution_for_branch(branch)
case Capistrano::CLI.debug_prompt(branch)
when "y"
true
when "n"