mirror of
https://github.com/puma/puma.git
synced 2022-11-09 13:48:40 -05:00
Add experimental tcp mode (aka lopez express mode)
This commit is contained in:
parent
2ef57daf8b
commit
4ea35190ec
12 changed files with 210 additions and 3 deletions
|
@ -1,3 +1,8 @@
|
|||
=== 2.4.1 / 2013-08-07
|
||||
|
||||
* 1 experimental feature:
|
||||
* Support raw tcp servers (aka Lopez Express mode)
|
||||
|
||||
=== 2.4.0 / 2013-07-22
|
||||
|
||||
* 5 minor features:
|
||||
|
|
|
@ -51,6 +51,7 @@ lib/puma/reactor.rb
|
|||
lib/puma/runner.rb
|
||||
lib/puma/server.rb
|
||||
lib/puma/single.rb
|
||||
lib/puma/tcp_logger.rb
|
||||
lib/puma/thread_pool.rb
|
||||
lib/puma/util.rb
|
||||
lib/rack/handler/puma.rb
|
||||
|
|
|
@ -186,6 +186,10 @@ module Puma
|
|||
end
|
||||
end
|
||||
|
||||
o.on "--tcp-mode", "Run the app in raw TCP mode instead of HTTP mode" do
|
||||
@options[:mode] = :tcp
|
||||
end
|
||||
|
||||
o.on "-V", "--version", "Print the version information" do
|
||||
puts "puma version #{Puma::Const::VERSION}"
|
||||
exit 1
|
||||
|
|
|
@ -22,11 +22,15 @@ module Puma
|
|||
class Client
|
||||
include Puma::Const
|
||||
|
||||
def initialize(io, env)
|
||||
def initialize(io, env=nil)
|
||||
@io = io
|
||||
@to_io = io.to_io
|
||||
@proto_env = env
|
||||
@env = env.dup
|
||||
if !env
|
||||
@env = nil
|
||||
else
|
||||
@env = env.dup
|
||||
end
|
||||
|
||||
@parser = HttpParser.new
|
||||
@parsed_bytes = 0
|
||||
|
|
|
@ -17,6 +17,7 @@ module Puma
|
|||
|
||||
def initialize(options)
|
||||
@options = options
|
||||
@options[:mode] ||= :http
|
||||
@options[:binds] ||= []
|
||||
@options[:on_restart] ||= []
|
||||
@options[:worker_boot] ||= []
|
||||
|
@ -99,6 +100,13 @@ module Puma
|
|||
end
|
||||
end
|
||||
|
||||
if @options[:mode] == :tcp
|
||||
require 'puma/tcp_logger'
|
||||
|
||||
logger = @options[:logger] || STDOUT
|
||||
return TCPLogger.new(logger, app, @options[:quiet])
|
||||
end
|
||||
|
||||
if !@options[:quiet] and @options[:environment] == "development"
|
||||
logger = @options[:logger] || STDOUT
|
||||
app = Rack::CommonLogger.new(app, logger)
|
||||
|
@ -303,6 +311,11 @@ module Puma
|
|||
@options[:worker_directory] = dir.to_s
|
||||
end
|
||||
|
||||
# Run the app as a raw TCP app instead of an HTTP rack app
|
||||
def tcp_mode
|
||||
@options[:mode] = :tcp
|
||||
end
|
||||
|
||||
# *Cluster mode only* Preload the application before starting
|
||||
# the workers and setting up the listen ports. This conflicts
|
||||
# with using the phased restart feature, you can't use both.
|
||||
|
|
|
@ -28,7 +28,7 @@ module Puma
|
|||
# too taxing on performance.
|
||||
module Const
|
||||
|
||||
PUMA_VERSION = VERSION = "2.4.0".freeze
|
||||
PUMA_VERSION = VERSION = "2.4.1".freeze
|
||||
CODE_NAME = "Crunchy Munchy Lunchy"
|
||||
|
||||
FAST_TRACK_KA_TIMEOUT = 0.2
|
||||
|
|
|
@ -70,6 +70,10 @@ module Puma
|
|||
log "* Version #{Puma::Const::PUMA_VERSION}, codename: #{Puma::Const::CODE_NAME}"
|
||||
log "* Min threads: #{min_t}, max threads: #{max_t}"
|
||||
log "* Environment: #{ENV['RACK_ENV']}"
|
||||
|
||||
if @options[:mode] == :tcp
|
||||
log "* Mode: Lopez Express (tcp)"
|
||||
end
|
||||
end
|
||||
|
||||
def redirect_io
|
||||
|
@ -120,6 +124,10 @@ module Puma
|
|||
server.max_threads = max_t
|
||||
server.inherit_binder @cli.binder
|
||||
|
||||
if @options[:mode] == :tcp
|
||||
server.tcp_mode!
|
||||
end
|
||||
|
||||
unless development?
|
||||
server.leak_stack_on_error = false
|
||||
end
|
||||
|
|
|
@ -75,6 +75,8 @@ module Puma
|
|||
@options = options
|
||||
|
||||
ENV['RACK_ENV'] ||= "development"
|
||||
|
||||
@mode = :http
|
||||
end
|
||||
|
||||
attr_accessor :binder, :leak_stack_on_error
|
||||
|
@ -88,6 +90,10 @@ module Puma
|
|||
@own_binder = false
|
||||
end
|
||||
|
||||
def tcp_mode!
|
||||
@mode = :tcp
|
||||
end
|
||||
|
||||
# On Linux, use TCP_CORK to better control how the TCP stack
|
||||
# packetizes our stream. This improves both latency and throughput.
|
||||
#
|
||||
|
@ -121,6 +127,87 @@ module Puma
|
|||
@thread_pool and @thread_pool.spawned
|
||||
end
|
||||
|
||||
# Lopez Mode == raw tcp apps
|
||||
|
||||
def run_lopez_mode(background=true)
|
||||
@thread_pool = ThreadPool.new(@min_threads,
|
||||
@max_threads,
|
||||
Hash) do |client, tl|
|
||||
|
||||
io = client.to_io
|
||||
addr = io.peeraddr.last
|
||||
|
||||
if addr.empty?
|
||||
# Set unix socket addrs to localhost
|
||||
addr = "127.0.0.1:0"
|
||||
else
|
||||
addr = "#{addr}:#{io.peeraddr[1]}"
|
||||
end
|
||||
|
||||
env = { 'thread' => tl, REMOTE_ADDR => addr }
|
||||
|
||||
begin
|
||||
@app.call env, client.to_io
|
||||
rescue Object => e
|
||||
STDERR.puts "! Detected exception at toplevel: #{e.message} (#{e.class})"
|
||||
STDERR.puts e.backtrace
|
||||
end
|
||||
|
||||
client.close unless env['detach']
|
||||
end
|
||||
|
||||
if background
|
||||
@thread = Thread.new { handle_servers_lopez_mode }
|
||||
return @thread
|
||||
else
|
||||
handle_lopez_servers
|
||||
end
|
||||
end
|
||||
|
||||
def handle_servers_lopez_mode
|
||||
begin
|
||||
check = @check
|
||||
sockets = [check] + @binder.ios
|
||||
pool = @thread_pool
|
||||
|
||||
while @status == :run
|
||||
begin
|
||||
ios = IO.select sockets
|
||||
ios.first.each do |sock|
|
||||
if sock == check
|
||||
break if handle_check
|
||||
else
|
||||
begin
|
||||
if io = sock.accept_nonblock
|
||||
c = Client.new io, nil
|
||||
pool << c
|
||||
end
|
||||
rescue SystemCallError
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue Errno::ECONNABORTED
|
||||
# client closed the socket even before accept
|
||||
client.close rescue nil
|
||||
rescue Object => e
|
||||
@events.unknown_error self, e, "Listen loop"
|
||||
end
|
||||
end
|
||||
|
||||
graceful_shutdown if @status == :stop || @status == :restart
|
||||
|
||||
rescue Exception => e
|
||||
STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
|
||||
STDERR.puts e.backtrace
|
||||
ensure
|
||||
@check.close
|
||||
@notify.close
|
||||
|
||||
if @status != :restart and @own_binder
|
||||
@binder.close
|
||||
end
|
||||
end
|
||||
end
|
||||
# Runs the server.
|
||||
#
|
||||
# If +background+ is true (the default) then a thread is spun
|
||||
|
@ -132,6 +219,10 @@ module Puma
|
|||
|
||||
@status = :run
|
||||
|
||||
if @mode == :tcp
|
||||
return run_lopez_mode(background)
|
||||
end
|
||||
|
||||
@thread_pool = ThreadPool.new(@min_threads,
|
||||
@max_threads,
|
||||
IOBuffer) do |client, buffer|
|
||||
|
|
32
lib/puma/tcp_logger.rb
Normal file
32
lib/puma/tcp_logger.rb
Normal file
|
@ -0,0 +1,32 @@
|
|||
module Puma
|
||||
class TCPLogger
|
||||
def initialize(logger, app, quiet=false)
|
||||
@logger = logger
|
||||
@app = app
|
||||
@quiet = quiet
|
||||
end
|
||||
|
||||
FORMAT = "%s - %s"
|
||||
|
||||
def log(who, str)
|
||||
now = Time.now.strftime("%d/%b/%Y %H:%M:%S")
|
||||
|
||||
@logger.puts "#{now} - #{who} - #{str}"
|
||||
end
|
||||
|
||||
def call(env, socket)
|
||||
who = env[Const::REMOTE_ADDR]
|
||||
log who, "connected" unless @quiet
|
||||
|
||||
env['log'] = lambda { |str| log(who, str) }
|
||||
|
||||
begin
|
||||
@app.call env, socket
|
||||
rescue Object => e
|
||||
log who, "exception: #{e.message} (#{e.class})"
|
||||
else
|
||||
log who, "disconnected" unless @quiet
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
2
test/config.rb
Normal file
2
test/config.rb
Normal file
|
@ -0,0 +1,2 @@
|
|||
pidfile "/tmp/jruby.pid"
|
||||
switch_user "daemon", "daemon"
|
5
test/hello-tcp.ru
Normal file
5
test/hello-tcp.ru
Normal file
|
@ -0,0 +1,5 @@
|
|||
run lambda { |env, socket|
|
||||
p :here
|
||||
socket.puts "Sockets for the low, low price of free!"
|
||||
socket.close
|
||||
}
|
42
test/test_tcp_rack.rb
Normal file
42
test/test_tcp_rack.rb
Normal file
|
@ -0,0 +1,42 @@
|
|||
require "rbconfig"
|
||||
require 'test/unit'
|
||||
require 'socket'
|
||||
require 'openssl'
|
||||
|
||||
require 'puma/minissl'
|
||||
require 'puma/server'
|
||||
|
||||
require 'net/https'
|
||||
|
||||
class TestTCPRack < Test::Unit::TestCase
|
||||
|
||||
def setup
|
||||
@port = 3212
|
||||
@host = "127.0.0.1"
|
||||
|
||||
@events = Puma::Events.new STDOUT, STDERR
|
||||
@server = Puma::Server.new nil, @events
|
||||
end
|
||||
|
||||
def teardown
|
||||
@server.stop(true)
|
||||
end
|
||||
|
||||
def test_passes_the_socket
|
||||
@server.tcp_mode!
|
||||
|
||||
body = "We sell hats for a discount!\n"
|
||||
|
||||
@server.app = proc do |env, socket|
|
||||
socket << body
|
||||
socket.close
|
||||
end
|
||||
|
||||
@server.add_tcp_listener @host, @port
|
||||
@server.run
|
||||
|
||||
sock = TCPSocket.new @host, @port
|
||||
|
||||
assert_equal body, sock.read
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue