mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Spike of cluster mode
This commit is contained in:
parent
6a90d64889
commit
dad69f5cf9
6 changed files with 203 additions and 6 deletions
8
bin/puma
8
bin/puma
|
@ -4,7 +4,13 @@
|
|||
#
|
||||
|
||||
require 'puma/cli'
|
||||
require 'puma/cluster_cli'
|
||||
|
||||
cli = Puma::CLI.new ARGV
|
||||
if ARGV[0] == "cluster"
|
||||
ARGV.shift
|
||||
cli = Puma::ClusterCLI.new ARGV
|
||||
else
|
||||
cli = Puma::CLI.new ARGV
|
||||
end
|
||||
|
||||
cli.run
|
||||
|
|
|
@ -21,6 +21,7 @@ module Puma
|
|||
# this object will report status on.
|
||||
#
|
||||
def initialize(argv, stdout=STDOUT, stderr=STDERR)
|
||||
@debug = false
|
||||
@argv = argv
|
||||
@stdout = stdout
|
||||
@stderr = stderr
|
||||
|
@ -141,6 +142,12 @@ module Puma
|
|||
@events.error str
|
||||
end
|
||||
|
||||
def debug(str)
|
||||
if @debug
|
||||
@events.log "- #{str}"
|
||||
end
|
||||
end
|
||||
|
||||
# Build the OptionParser object to handle the available options.
|
||||
#
|
||||
def setup_options
|
||||
|
@ -152,7 +159,7 @@ module Puma
|
|||
}
|
||||
|
||||
@parser = OptionParser.new do |o|
|
||||
o.on "-b", "--bind URI", "URI to bind to (tcp:// and unix:// only)" do |arg|
|
||||
o.on "-b", "--bind URI", "URI to bind to (tcp://, unix://, ssl://)" do |arg|
|
||||
@options[:binds] << arg
|
||||
end
|
||||
|
||||
|
|
168
lib/puma/cluster_cli.rb
Normal file
168
lib/puma/cluster_cli.rb
Normal file
|
@ -0,0 +1,168 @@
|
|||
require 'puma/cli'
|
||||
require 'posix/spawn'
|
||||
|
||||
module Puma
|
||||
class ClusterCLI < CLI
|
||||
def setup_options
|
||||
@options = {
|
||||
:workers => 2,
|
||||
:min_threads => 0,
|
||||
:max_threads => 16,
|
||||
:quiet => false,
|
||||
:binds => []
|
||||
}
|
||||
|
||||
@parser = OptionParser.new do |o|
|
||||
o.on "-w", "--workers COUNT",
|
||||
"How many worker processes to create" do |arg|
|
||||
@options[:workers] = arg.to_i
|
||||
end
|
||||
|
||||
o.on "-b", "--bind URI",
|
||||
"URI to bind to (tcp://, unix://, ssl://)" do |arg|
|
||||
@options[:binds] << arg
|
||||
end
|
||||
|
||||
o.on '-t', '--threads INT', "min:max threads to use (default 0:16)" do |arg|
|
||||
min, max = arg.split(":")
|
||||
if max
|
||||
@options[:min_threads] = min.to_i
|
||||
@options[:max_threads] = max.to_i
|
||||
else
|
||||
@options[:min_threads] = 0
|
||||
@options[:max_threads] = arg.to_i
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
class PidEvents < Events
|
||||
def log(str)
|
||||
super "[#{$$}] #{str}"
|
||||
end
|
||||
|
||||
def write(str)
|
||||
super "[#{$$}] #{str}"
|
||||
end
|
||||
|
||||
def error(str)
|
||||
super "[#{$$}] #{str}"
|
||||
end
|
||||
end
|
||||
|
||||
def worker
|
||||
Signal.trap "SIGINT", "IGNORE"
|
||||
|
||||
min_t = @options[:min_threads]
|
||||
max_t = @options[:max_threads]
|
||||
|
||||
server = Puma::Server.new @config.app, @events
|
||||
server.min_threads = min_t
|
||||
server.max_threads = max_t
|
||||
|
||||
@ios.each do |fd, uri|
|
||||
server.inherit_tcp_listener uri.host, uri.port, fd
|
||||
end
|
||||
|
||||
Signal.trap "SIGTERM" do
|
||||
server.stop
|
||||
end
|
||||
|
||||
server.run.join
|
||||
end
|
||||
|
||||
def stop_workers
|
||||
@workers.each { |x| x.term }
|
||||
end
|
||||
|
||||
class Worker
|
||||
def initialize(pid)
|
||||
@pid = pid
|
||||
end
|
||||
|
||||
attr_reader :pid
|
||||
|
||||
def term
|
||||
begin
|
||||
Process.kill "TERM", @pid
|
||||
rescue Errno::ESRCH
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def spawn_workers
|
||||
diff = @options[:workers] - @workers.size
|
||||
|
||||
diff.times do
|
||||
pid = fork { worker }
|
||||
debug "Spawned worker: #{pid}"
|
||||
@workers << Worker.new(pid)
|
||||
end
|
||||
end
|
||||
|
||||
def check_workers
|
||||
while true
|
||||
pid = Process.waitpid(-1, Process::WNOHANG)
|
||||
break unless pid
|
||||
|
||||
@workers.delete_if { |w| w.pid == pid }
|
||||
end
|
||||
|
||||
spawn_workers
|
||||
end
|
||||
|
||||
def run
|
||||
@debug = true
|
||||
|
||||
@workers = []
|
||||
@events = PidEvents.new STDOUT, STDERR
|
||||
|
||||
@options[:logger] = @events
|
||||
|
||||
parse_options
|
||||
|
||||
set_rack_environment
|
||||
|
||||
@ios = []
|
||||
|
||||
@options[:binds].each do |str|
|
||||
uri = URI.parse str
|
||||
case uri.scheme
|
||||
when "tcp"
|
||||
s = TCPServer.new(uri.host, uri.port)
|
||||
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
|
||||
s.listen 1024
|
||||
|
||||
@ios << [s, uri]
|
||||
else
|
||||
raise "bad bind uri - #{str}"
|
||||
end
|
||||
end
|
||||
|
||||
log "Puma #{Puma::Const::PUMA_VERSION} starting in cluster mode..."
|
||||
log "* Process workers: #{@options[:workers]}"
|
||||
log "* Min threads: #{@options[:min_threads]}, max threads: #{@options[:max_threads]}"
|
||||
log "* Environment: #{ENV['RACK_ENV']}"
|
||||
|
||||
read, write = IO.pipe
|
||||
|
||||
Signal.trap "SIGCHLD" do
|
||||
write.write "!"
|
||||
end
|
||||
|
||||
spawn_workers
|
||||
|
||||
begin
|
||||
while true
|
||||
IO.select([read], nil, nil, 5)
|
||||
check_workers
|
||||
end
|
||||
rescue Interrupt
|
||||
stop_workers
|
||||
p Process.waitall
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -68,7 +68,8 @@ module Puma
|
|||
end
|
||||
|
||||
unless @options[:quiet]
|
||||
app = Rack::CommonLogger.new(app, STDOUT)
|
||||
logger = @options[:logger] || STDOUT
|
||||
app = Rack::CommonLogger.new(app, logger)
|
||||
end
|
||||
|
||||
return app
|
||||
|
|
|
@ -26,6 +26,10 @@ module Puma
|
|||
@stdout.puts str
|
||||
end
|
||||
|
||||
def write(str)
|
||||
@stdout.write str
|
||||
end
|
||||
|
||||
# Write +str+ to +@stderr+
|
||||
#
|
||||
def error(str)
|
||||
|
|
|
@ -123,7 +123,12 @@ module Puma
|
|||
end
|
||||
|
||||
def inherit_tcp_listener(host, port, fd)
|
||||
s = TCPServer.for_fd(fd)
|
||||
if fd.kind_of? TCPServer
|
||||
s = fd
|
||||
else
|
||||
s = TCPServer.for_fd(fd)
|
||||
end
|
||||
|
||||
@ios << s
|
||||
s
|
||||
end
|
||||
|
@ -247,8 +252,14 @@ module Puma
|
|||
if sock == check
|
||||
break if handle_check
|
||||
else
|
||||
c = Client.new sock.accept, @envs.fetch(sock, @proto_env)
|
||||
@thread_pool << c
|
||||
begin
|
||||
if io = sock.accept_nonblock
|
||||
c = Client.new io, @envs.fetch(sock, @proto_env)
|
||||
@thread_pool << c
|
||||
end
|
||||
rescue SystemCallError => e
|
||||
p e
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue Errno::ECONNABORTED
|
||||
|
|
Loading…
Reference in a new issue