mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Handle more IO errors (especially, ECONNRESET)
Also, address the possibility of the listen thread dying and needing to be respawned. As a bonus, we now defer construction of the thread until we are first given something to monitor.
This commit is contained in:
parent
ce37de4a19
commit
786ed1b3ad
3 changed files with 79 additions and 22 deletions
|
@ -1,18 +1,17 @@
|
||||||
require 'nio'
|
require 'nio'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
module ActionCable
|
module ActionCable
|
||||||
module Connection
|
module Connection
|
||||||
class StreamEventLoop
|
class StreamEventLoop
|
||||||
def initialize
|
def initialize
|
||||||
@nio = NIO::Selector.new
|
@nio = @thread = nil
|
||||||
@map = {}
|
@map = {}
|
||||||
@stopping = false
|
@stopping = false
|
||||||
@todo = Queue.new
|
@todo = Queue.new
|
||||||
|
|
||||||
Thread.new do
|
@spawn_mutex = Mutex.new
|
||||||
Thread.current.abort_on_exception = true
|
spawn
|
||||||
run
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def attach(io, stream)
|
def attach(io, stream)
|
||||||
|
@ -20,34 +19,53 @@ module ActionCable
|
||||||
@map[io] = stream
|
@map[io] = stream
|
||||||
@nio.register(io, :r)
|
@nio.register(io, :r)
|
||||||
end
|
end
|
||||||
@nio.wakeup
|
wakeup
|
||||||
end
|
end
|
||||||
|
|
||||||
def detach(io, stream)
|
def detach(io, stream)
|
||||||
@todo << lambda do
|
@todo << lambda do
|
||||||
@nio.deregister(io)
|
@nio.deregister io
|
||||||
@map.delete io
|
@map.delete io
|
||||||
end
|
end
|
||||||
@nio.wakeup
|
wakeup
|
||||||
end
|
end
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
@stopping = true
|
@stopping = true
|
||||||
@nio.wakeup
|
wakeup if @nio
|
||||||
end
|
end
|
||||||
|
|
||||||
def run
|
private
|
||||||
loop do
|
def spawn
|
||||||
if @stopping
|
return if @thread && @thread.status
|
||||||
@nio.close
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
until @todo.empty?
|
@spawn_mutex.synchronize do
|
||||||
@todo.pop(true).call
|
return if @thread && @thread.status
|
||||||
end
|
|
||||||
|
@nio ||= NIO::Selector.new
|
||||||
|
@thread = Thread.new { run }
|
||||||
|
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def wakeup
|
||||||
|
spawn || @nio.wakeup
|
||||||
|
end
|
||||||
|
|
||||||
|
def run
|
||||||
|
loop do
|
||||||
|
if @stopping
|
||||||
|
@nio.close
|
||||||
|
break
|
||||||
|
end
|
||||||
|
|
||||||
|
until @todo.empty?
|
||||||
|
@todo.pop(true).call
|
||||||
|
end
|
||||||
|
|
||||||
|
next unless monitors = @nio.select
|
||||||
|
|
||||||
if monitors = @nio.select
|
|
||||||
monitors.each do |monitor|
|
monitors.each do |monitor|
|
||||||
io = monitor.io
|
io = monitor.io
|
||||||
stream = @map[io]
|
stream = @map[io]
|
||||||
|
@ -56,13 +74,21 @@ module ActionCable
|
||||||
stream.receive io.read_nonblock(4096)
|
stream.receive io.read_nonblock(4096)
|
||||||
rescue IO::WaitReadable
|
rescue IO::WaitReadable
|
||||||
next
|
next
|
||||||
rescue EOFError
|
rescue
|
||||||
stream.close
|
# We expect one of EOFError or Errno::ECONNRESET in
|
||||||
|
# normal operation (when the client goes away). But if
|
||||||
|
# anything else goes wrong, this is still the best way
|
||||||
|
# to handle it.
|
||||||
|
begin
|
||||||
|
stream.close
|
||||||
|
rescue
|
||||||
|
@nio.deregister io
|
||||||
|
@map.delete io
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -7,6 +7,11 @@ class EchoChannel < ActionCable::Channel::Base
|
||||||
transmit(dong: data['message'])
|
transmit(dong: data['message'])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def delay(data)
|
||||||
|
sleep 1
|
||||||
|
transmit(dong: data['message'])
|
||||||
|
end
|
||||||
|
|
||||||
def bulk(data)
|
def bulk(data)
|
||||||
ActionCable.server.broadcast "global", wide: data['message']
|
ActionCable.server.broadcast "global", wide: data['message']
|
||||||
end
|
end
|
||||||
|
|
|
@ -181,6 +181,15 @@ class ClientTest < ActionCable::TestCase
|
||||||
@ws.close
|
@ws.close
|
||||||
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
|
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def close!
|
||||||
|
sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach)
|
||||||
|
|
||||||
|
# Force a TCP reset
|
||||||
|
sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii'))
|
||||||
|
|
||||||
|
sock.close
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def faye_client(port)
|
def faye_client(port)
|
||||||
|
@ -235,4 +244,21 @@ class ClientTest < ActionCable::TestCase
|
||||||
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_disappearing_client
|
||||||
|
with_puma_server do |port|
|
||||||
|
c = faye_client(port)
|
||||||
|
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||||
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||||
|
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
|
||||||
|
c.close! # disappear before write
|
||||||
|
|
||||||
|
c = faye_client(port)
|
||||||
|
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||||
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||||
|
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
|
||||||
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
||||||
|
c.close! # disappear before read
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue