1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00

Refactor come parts between normal and cluster mode

This commit is contained in:
Evan Phoenix 2012-08-02 16:03:52 -06:00
parent dad69f5cf9
commit 508b235f54
6 changed files with 339 additions and 263 deletions

View file

@ -0,0 +1,23 @@
require 'openssl'
module OpenSSL
module SSL
if RUBY_VERSION < "1.9"
class SSLServer
def accept_nonblock
sock = @svr.accept_nonblock
begin
ssl = OpenSSL::SSL::SSLSocket.new(sock, @ctx)
ssl.sync_close = true
ssl.accept if @start_immediately
ssl
rescue SSLError => ex
sock.close
raise ex
end
end
end
end
end
end

234
lib/puma/binder.rb Normal file
View file

@ -0,0 +1,234 @@
require 'puma/const'
module Puma
class Binder
include Puma::Const
def initialize(events)
@events = events
@listeners = []
@inherited_fds = {}
@unix_paths = []
@proto_env = {
"rack.version".freeze => Rack::VERSION,
"rack.errors".freeze => events.stderr,
"rack.multithread".freeze => true,
"rack.multiprocess".freeze => false,
"rack.run_once".freeze => true,
"SCRIPT_NAME".freeze => ENV['SCRIPT_NAME'] || "",
# Rack blows up if this is an empty string, and Rack::Lint
# blows up if it's nil. So 'text/plain' seems like the most
# sensible default value.
"CONTENT_TYPE".freeze => "text/plain",
"QUERY_STRING".freeze => "",
SERVER_PROTOCOL => HTTP_11,
SERVER_SOFTWARE => PUMA_VERSION,
GATEWAY_INTERFACE => CGI_VER
}
@envs = {}
@ios = []
end
attr_reader :listeners, :ios
def env(sock)
@envs.fetch(sock, @proto_env)
end
def close
@ios.each { |i| i.close }
@unix_paths.each { |i| File.unlink i }
end
def import_from_env
remove = []
ENV.each do |k,v|
if k =~ /PUMA_INHERIT_\d+/
fd, url = v.split(":", 2)
@inherited_fds[url] = fd.to_i
remove << k
end
end
remove.each do |k|
ENV.delete k
end
end
def parse(binds, logger)
binds.each do |str|
uri = URI.parse str
case uri.scheme
when "tcp"
if fd = @inherited_fds.delete(str)
logger.log "* Inherited #{str}"
io = inherit_tcp_listener uri.host, uri.port, fd
else
logger.log "* Listening on #{str}"
io = add_tcp_listener uri.host, uri.port
end
@listeners << [str, io]
when "unix"
if fd = @inherited_fds.delete(str)
logger.log "* Inherited #{str}"
io = inherit_unix_listener uri.path, fd
else
logger.log "* Listening on #{str}"
path = "#{uri.host}#{uri.path}"
umask = nil
if uri.query
params = Rack::Utils.parse_query uri.query
if u = params['umask']
# Use Integer() to respect the 0 prefix as octal
umask = Integer(u)
end
end
io = add_unix_listener path, umask
end
@listeners << [str, io]
when "ssl"
params = Rack::Utils.parse_query uri.query
require 'openssl'
ctx = OpenSSL::SSL::SSLContext.new
unless params['key']
logger.error "Please specify the SSL key via 'key='"
end
ctx.key = OpenSSL::PKey::RSA.new File.read(params['key'])
unless params['cert']
logger.error "Please specify the SSL cert via 'cert='"
end
ctx.cert = OpenSSL::X509::Certificate.new File.read(params['cert'])
ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE
if fd = @inherited_fds.delete(str)
logger.log "* Inherited #{str}"
io = inherited_ssl_listener fd, ctx
else
logger.log "* Listening on #{str}"
io = add_ssl_listener uri.host, uri.port, ctx
end
@listeners << [str, io]
else
logger.error "Invalid URI: #{str}"
end
end
# If we inherited fds but didn't use them (because of a
# configuration change), then be sure to close them.
@inherited_fds.each do |str, fd|
logger.log "* Closing unused inherited connection: #{str}"
begin
IO.for_fd(fd).close
rescue SystemCallError
end
# We have to unlink a unix socket path that's not being used
uri = URI.parse str
if uri.scheme == "unix"
path = "#{uri.host}#{uri.path}"
File.unlink path
end
end
end
# Tell the server to listen on host +host+, port +port+.
# If +optimize_for_latency+ is true (the default) then clients connecting
# will be optimized for latency over throughput.
#
# +backlog+ indicates how many unaccepted connections the kernel should
# allow to accumulate before returning connection refused.
#
def add_tcp_listener(host, port, optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
@ios << s
s
end
def inherit_tcp_listener(host, port, fd)
if fd.kind_of? TCPServer
s = fd
else
s = TCPServer.for_fd(fd)
end
@ios << s
s
end
def add_ssl_listener(host, port, ctx,
optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
ssl = OpenSSL::SSL::SSLServer.new(s, ctx)
env = @proto_env.dup
env[HTTPS_KEY] = HTTPS
@envs[ssl] = env
@ios << ssl
s
end
def inherited_ssl_listener(fd, ctx)
s = TCPServer.for_fd(fd)
@ios << OpenSSL::SSL::SSLServer.new(s, ctx)
s
end
# Tell the server to listen on +path+ as a UNIX domain socket.
#
def add_unix_listener(path, umask=nil)
@unix_paths << path
# Let anyone connect by default
umask ||= 0
begin
old_mask = File.umask(umask)
s = UNIXServer.new(path)
@ios << s
ensure
File.umask old_mask
end
s
end
def inherit_unix_listener(path, fd)
@unix_paths << path
s = UNIXServer.for_fd fd
@ios << s
s
end
end
end

View file

@ -4,6 +4,7 @@ require 'uri'
require 'puma/server'
require 'puma/const'
require 'puma/configuration'
require 'puma/binder'
require 'rack/commonlogger'
require 'rack/utils'
@ -33,26 +34,12 @@ module Puma
@restart = false
@listeners = []
setup_options
generate_restart_data
@inherited_fds = {}
remove = []
ENV.each do |k,v|
if k =~ /PUMA_INHERIT_\d+/
fd, url = v.split(":", 2)
@inherited_fds[url] = fd.to_i
remove << k
end
end
remove.each do |k|
ENV.delete k
end
@binder = Binder.new(@events)
@binder.import_from_env
end
def restart_on_stop!
@ -101,7 +88,7 @@ module Puma
end
if IS_JRUBY
@listeners.each_with_index do |(str,io),i|
@binder.listeners.each_with_index do |(str,io),i|
io.close
# We have to unlink a unix socket path that's not being used
@ -115,7 +102,7 @@ module Puma
require 'puma/jruby_restart'
JRubyRestart.chdir_exec(@restart_dir, Gem.ruby, *@restart_argv)
else
@listeners.each_with_index do |(l,io),i|
@binder.listeners.each_with_index do |(l,io),i|
ENV["PUMA_INHERIT_#{i}"] = "#{io.to_i}:#{l}"
end
@ -143,7 +130,7 @@ module Puma
end
def debug(str)
if @debug
if @options[:debug]
@events.log "- #{str}"
end
end
@ -155,6 +142,7 @@ module Puma
:min_threads => 0,
:max_threads => 16,
:quiet => false,
:debug => false,
:binds => []
}
@ -184,6 +172,10 @@ module Puma
@options[:quiet] = true
end
o.on "--debug", "Log lowlevel debugging information" do
@options[:debug] = true
end
o.on "-S", "--state PATH", "Where to store the state details" do |arg|
@options[:state] = arg
end
@ -312,6 +304,7 @@ module Puma
max_t = @options[:max_threads]
server = Puma::Server.new app, @events
server.binder = @binder
server.min_threads = min_t
server.max_threads = max_t
@ -319,91 +312,7 @@ module Puma
log "* Min threads: #{min_t}, max threads: #{max_t}"
log "* Environment: #{ENV['RACK_ENV']}"
@options[:binds].each do |str|
uri = URI.parse str
case uri.scheme
when "tcp"
if fd = @inherited_fds.delete(str)
log "* Inherited #{str}"
io = server.inherit_tcp_listener uri.host, uri.port, fd
else
log "* Listening on #{str}"
io = server.add_tcp_listener uri.host, uri.port
end
@listeners << [str, io]
when "unix"
if fd = @inherited_fds.delete(str)
log "* Inherited #{str}"
io = server.inherit_unix_listener uri.path, fd
else
log "* Listening on #{str}"
path = "#{uri.host}#{uri.path}"
umask = nil
if uri.query
params = Rack::Utils.parse_query uri.query
if u = params['umask']
# Use Integer() to respect the 0 prefix as octal
umask = Integer(u)
end
end
io = server.add_unix_listener path, umask
end
@listeners << [str, io]
when "ssl"
params = Rack::Utils.parse_query uri.query
require 'openssl'
ctx = OpenSSL::SSL::SSLContext.new
unless params['key']
error "Please specify the SSL key via 'key='"
end
ctx.key = OpenSSL::PKey::RSA.new File.read(params['key'])
unless params['cert']
error "Please specify the SSL cert via 'cert='"
end
ctx.cert = OpenSSL::X509::Certificate.new File.read(params['cert'])
ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE
if fd = @inherited_fds.delete(str)
log "* Inherited #{str}"
io = server.inherited_ssl_listener fd, ctx
else
log "* Listening on #{str}"
io = server.add_ssl_listener uri.host, uri.port, ctx
end
@listeners << [str, io]
else
error "Invalid URI: #{str}"
end
end
# If we inherited fds but didn't use them (because of a
# configuration change), then be sure to close them.
@inherited_fds.each do |str, fd|
log "* Closing unused inherited connection: #{str}"
begin
IO.for_fd(fd).close
rescue SystemCallError
end
# We have to unlink a unix socket path that's not being used
uri = URI.parse str
if uri.scheme == "unix"
path = "#{uri.host}#{uri.path}"
File.unlink path
end
end
@binder.parse @options[:binds], self
@server = server

View file

@ -1,40 +1,21 @@
require 'puma/cli'
require 'puma/binder'
require 'posix/spawn'
module Puma
class ClusterCLI < CLI
def setup_options
@options = {
:workers => 2,
:min_threads => 0,
:max_threads => 16,
:quiet => false,
:binds => []
}
super
@parser = OptionParser.new do |o|
o.on "-w", "--workers COUNT",
"How many worker processes to create" do |arg|
@options[:workers] = arg.to_i
end
o.on "-b", "--bind URI",
"URI to bind to (tcp://, unix://, ssl://)" do |arg|
@options[:binds] << arg
end
o.on '-t', '--threads INT', "min:max threads to use (default 0:16)" do |arg|
min, max = arg.split(":")
if max
@options[:min_threads] = min.to_i
@options[:max_threads] = max.to_i
else
@options[:min_threads] = 0
@options[:max_threads] = arg.to_i
end
end
@options[:workers] = 2
@parser.on "-w", "--workers COUNT",
"How many worker processes to create" do |arg|
@options[:workers] = arg.to_i
end
@parser.banner = "puma cluster <options> <rackup file>"
end
class PidEvents < Events
@ -54,16 +35,21 @@ module Puma
def worker
Signal.trap "SIGINT", "IGNORE"
@suicide_pipe.close
Thread.new do
IO.select [@check_pipe]
log "! Detected parent died, dieing"
exit! 1
end
min_t = @options[:min_threads]
max_t = @options[:max_threads]
server = Puma::Server.new @config.app, @events
server.min_threads = min_t
server.max_threads = max_t
@ios.each do |fd, uri|
server.inherit_tcp_listener uri.host, uri.port, fd
end
server.binder = @binder
Signal.trap "SIGTERM" do
server.stop
@ -73,7 +59,16 @@ module Puma
end
def stop_workers
log "- Gracefully shutting down workers..."
@workers.each { |x| x.term }
begin
Process.waitall
rescue Interrupt
log "! Cancelled waiting for workers"
else
log "- Goodbye!"
end
end
class Worker
@ -113,9 +108,8 @@ module Puma
end
def run
@debug = true
@workers = []
@events = PidEvents.new STDOUT, STDERR
@options[:logger] = @events
@ -124,36 +118,33 @@ module Puma
set_rack_environment
@ios = []
@options[:binds].each do |str|
uri = URI.parse str
case uri.scheme
when "tcp"
s = TCPServer.new(uri.host, uri.port)
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen 1024
@ios << [s, uri]
else
raise "bad bind uri - #{str}"
end
end
write_pid
write_state
log "Puma #{Puma::Const::PUMA_VERSION} starting in cluster mode..."
log "* Process workers: #{@options[:workers]}"
log "* Min threads: #{@options[:min_threads]}, max threads: #{@options[:max_threads]}"
log "* Environment: #{ENV['RACK_ENV']}"
@binder.parse @options[:binds], self
read, write = IO.pipe
Signal.trap "SIGCHLD" do
write.write "!"
end
# Used by the workers to detect if the master process dies.
# If select says that @check_pipe is ready, it's because the
# master has exited and @suicide_pipe has been automatically
# closed.
#
@check_pipe, @suicide_pipe = IO.pipe
spawn_workers
log "Use Ctrl-C to stop"
begin
while true
IO.select([read], nil, nil, 5)
@ -161,7 +152,8 @@ module Puma
end
rescue Interrupt
stop_workers
p Process.waitall
ensure
delete_pidfile
end
end
end

11
lib/puma/delegation.rb Normal file
View file

@ -0,0 +1,11 @@
module Puma
module Delegation
def forward(what, who)
module_eval <<-CODE
def #{what}(*args, &blk)
#{who}.#{what}(*args, &blk)
end
CODE
end
end
end

View file

@ -8,6 +8,9 @@ require 'puma/null_io'
require 'puma/compat'
require 'puma/reactor'
require 'puma/client'
require 'puma/binder'
require 'puma/delegation'
require 'puma/accept_nonblock'
require 'puma/puma_http11'
@ -19,6 +22,7 @@ module Puma
class Server
include Puma::Const
extend Puma::Delegation
attr_reader :thread
attr_reader :events
@ -42,7 +46,6 @@ module Puma
@events = events
@check, @notify = IO.pipe
@ios = [@check]
@status = :stop
@ -56,32 +59,17 @@ module Puma
@persistent_timeout = PERSISTENT_TIMEOUT
@persistent_check, @persistent_wakeup = IO.pipe
@unix_paths = []
@proto_env = {
"rack.version".freeze => Rack::VERSION,
"rack.errors".freeze => events.stderr,
"rack.multithread".freeze => true,
"rack.multiprocess".freeze => false,
"rack.run_once".freeze => true,
"SCRIPT_NAME".freeze => ENV['SCRIPT_NAME'] || "",
# Rack blows up if this is an empty string, and Rack::Lint
# blows up if it's nil. So 'text/plain' seems like the most
# sensible default value.
"CONTENT_TYPE".freeze => "text/plain",
"QUERY_STRING".freeze => "",
SERVER_PROTOCOL => HTTP_11,
SERVER_SOFTWARE => PUMA_VERSION,
GATEWAY_INTERFACE => CGI_VER
}
@envs = {}
@binder = Binder.new(events)
ENV['RACK_ENV'] ||= "development"
end
attr_accessor :binder
forward :add_tcp_listener, :@binder
forward :add_ssl_listener, :@binder
forward :add_unix_listener, :@binder
# On Linux, use TCP_CORK to better control how the TCP stack
# packetizes our stream. This improves both latency and throughput.
#
@ -104,86 +92,6 @@ module Puma
end
end
# Tell the server to listen on host +host+, port +port+.
# If +optimize_for_latency+ is true (the default) then clients connecting
# will be optimized for latency over throughput.
#
# +backlog+ indicates how many unaccepted connections the kernel should
# allow to accumulate before returning connection refused.
#
def add_tcp_listener(host, port, optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
@ios << s
s
end
def inherit_tcp_listener(host, port, fd)
if fd.kind_of? TCPServer
s = fd
else
s = TCPServer.for_fd(fd)
end
@ios << s
s
end
def add_ssl_listener(host, port, ctx, optimize_for_latency=true, backlog=1024)
s = TCPServer.new(host, port)
if optimize_for_latency
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.setsockopt(Socket::SOL_SOCKET,Socket::SO_REUSEADDR, true)
s.listen backlog
ssl = OpenSSL::SSL::SSLServer.new(s, ctx)
env = @proto_env.dup
env[HTTPS_KEY] = HTTPS
@envs[ssl] = env
@ios << ssl
s
end
def inherited_ssl_listener(fd, ctx)
s = TCPServer.for_fd(fd)
@ios << OpenSSL::SSL::SSLServer.new(s, ctx)
s
end
# Tell the server to listen on +path+ as a UNIX domain socket.
#
def add_unix_listener(path, umask=nil)
@unix_paths << path
# Let anyone connect by default
umask ||= 0
begin
old_mask = File.umask(umask)
s = UNIXServer.new(path)
@ios << s
ensure
File.umask old_mask
end
s
end
def inherit_unix_listener(path, fd)
@unix_paths << path
s = UNIXServer.for_fd fd
@ios << s
s
end
def backlog
@thread_pool and @thread_pool.backlog
end
@ -242,7 +150,7 @@ module Puma
def handle_servers
begin
check = @check
sockets = @ios
sockets = [check] + @binder.ios
pool = @thread_pool
while @status == :run
@ -254,11 +162,10 @@ module Puma
else
begin
if io = sock.accept_nonblock
c = Client.new io, @envs.fetch(sock, @proto_env)
c = Client.new io, @binder.env(sock)
@thread_pool << c
end
rescue SystemCallError => e
p e
end
end
end
@ -273,8 +180,8 @@ module Puma
graceful_shutdown if @status == :stop
ensure
unless @status == :restart
@ios.each { |i| i.close }
@unix_paths.each { |i| File.unlink i }
@check.close
@binder.close
end
end
end