diff --git a/bin/puma b/bin/puma index 9c67c0fc..1d0e63e3 100755 --- a/bin/puma +++ b/bin/puma @@ -4,7 +4,13 @@ # require 'puma/cli' +require 'puma/cluster_cli' -cli = Puma::CLI.new ARGV +if ARGV[0] == "cluster" + ARGV.shift + cli = Puma::ClusterCLI.new ARGV +else + cli = Puma::CLI.new ARGV +end cli.run diff --git a/lib/puma/cli.rb b/lib/puma/cli.rb index b8c3a591..6759f84a 100644 --- a/lib/puma/cli.rb +++ b/lib/puma/cli.rb @@ -21,6 +21,7 @@ module Puma # this object will report status on. # def initialize(argv, stdout=STDOUT, stderr=STDERR) + @debug = false @argv = argv @stdout = stdout @stderr = stderr @@ -141,6 +142,12 @@ module Puma @events.error str end + def debug(str) + if @debug + @events.log "- #{str}" + end + end + # Build the OptionParser object to handle the available options. # def setup_options @@ -152,7 +159,7 @@ module Puma } @parser = OptionParser.new do |o| - o.on "-b", "--bind URI", "URI to bind to (tcp:// and unix:// only)" do |arg| + o.on "-b", "--bind URI", "URI to bind to (tcp://, unix://, ssl://)" do |arg| @options[:binds] << arg end diff --git a/lib/puma/cluster_cli.rb b/lib/puma/cluster_cli.rb new file mode 100644 index 00000000..ef7cf2c5 --- /dev/null +++ b/lib/puma/cluster_cli.rb @@ -0,0 +1,168 @@ +require 'puma/cli' +require 'posix/spawn' + +module Puma + class ClusterCLI < CLI + def setup_options + @options = { + :workers => 2, + :min_threads => 0, + :max_threads => 16, + :quiet => false, + :binds => [] + } + + @parser = OptionParser.new do |o| + o.on "-w", "--workers COUNT", + "How many worker processes to create" do |arg| + @options[:workers] = arg.to_i + end + + o.on "-b", "--bind URI", + "URI to bind to (tcp://, unix://, ssl://)" do |arg| + @options[:binds] << arg + end + + o.on '-t', '--threads INT', "min:max threads to use (default 0:16)" do |arg| + min, max = arg.split(":") + if max + @options[:min_threads] = min.to_i + @options[:max_threads] = max.to_i + else + @options[:min_threads] = 0 + @options[:max_threads] = arg.to_i + end + end + + end + end + + class PidEvents < Events + def log(str) + super "[#{$$}] #{str}" + end + + def write(str) + super "[#{$$}] #{str}" + end + + def error(str) + super "[#{$$}] #{str}" + end + end + + def worker + Signal.trap "SIGINT", "IGNORE" + + min_t = @options[:min_threads] + max_t = @options[:max_threads] + + server = Puma::Server.new @config.app, @events + server.min_threads = min_t + server.max_threads = max_t + + @ios.each do |fd, uri| + server.inherit_tcp_listener uri.host, uri.port, fd + end + + Signal.trap "SIGTERM" do + server.stop + end + + server.run.join + end + + def stop_workers + @workers.each { |x| x.term } + end + + class Worker + def initialize(pid) + @pid = pid + end + + attr_reader :pid + + def term + begin + Process.kill "TERM", @pid + rescue Errno::ESRCH + end + end + end + + def spawn_workers + diff = @options[:workers] - @workers.size + + diff.times do + pid = fork { worker } + debug "Spawned worker: #{pid}" + @workers << Worker.new(pid) + end + end + + def check_workers + while true + pid = Process.waitpid(-1, Process::WNOHANG) + break unless pid + + @workers.delete_if { |w| w.pid == pid } + end + + spawn_workers + end + + def run + @debug = true + + @workers = [] + @events = PidEvents.new STDOUT, STDERR + + @options[:logger] = @events + + parse_options + + set_rack_environment + + @ios = [] + + @options[:binds].each do |str| + uri = URI.parse str + case uri.scheme + when "tcp" + s = TCPServer.new(uri.host, uri.port) + s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true) + s.listen 1024 + + @ios << [s, uri] + else + raise "bad bind uri - #{str}" + end + end + + log "Puma #{Puma::Const::PUMA_VERSION} starting in cluster mode..." + log "* Process workers: #{@options[:workers]}" + log "* Min threads: #{@options[:min_threads]}, max threads: #{@options[:max_threads]}" + log "* Environment: #{ENV['RACK_ENV']}" + + read, write = IO.pipe + + Signal.trap "SIGCHLD" do + write.write "!" + end + + spawn_workers + + begin + while true + IO.select([read], nil, nil, 5) + check_workers + end + rescue Interrupt + stop_workers + p Process.waitall + end + end + end +end diff --git a/lib/puma/configuration.rb b/lib/puma/configuration.rb index e6de574c..5e4fa4bb 100644 --- a/lib/puma/configuration.rb +++ b/lib/puma/configuration.rb @@ -68,7 +68,8 @@ module Puma end unless @options[:quiet] - app = Rack::CommonLogger.new(app, STDOUT) + logger = @options[:logger] || STDOUT + app = Rack::CommonLogger.new(app, logger) end return app diff --git a/lib/puma/events.rb b/lib/puma/events.rb index ca1a7783..3f892418 100644 --- a/lib/puma/events.rb +++ b/lib/puma/events.rb @@ -26,6 +26,10 @@ module Puma @stdout.puts str end + def write(str) + @stdout.write str + end + # Write +str+ to +@stderr+ # def error(str) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index e87948b9..508c9ef1 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -123,7 +123,12 @@ module Puma end def inherit_tcp_listener(host, port, fd) - s = TCPServer.for_fd(fd) + if fd.kind_of? TCPServer + s = fd + else + s = TCPServer.for_fd(fd) + end + @ios << s s end @@ -247,8 +252,14 @@ module Puma if sock == check break if handle_check else - c = Client.new sock.accept, @envs.fetch(sock, @proto_env) - @thread_pool << c + begin + if io = sock.accept_nonblock + c = Client.new io, @envs.fetch(sock, @proto_env) + @thread_pool << c + end + rescue SystemCallError => e + p e + end end end rescue Errno::ECONNABORTED