mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Introduce Streams as the domain language for the pubsub channels Channels redeliver messages from
This commit is contained in:
parent
c2e2a94306
commit
5c4f07d34e
5 changed files with 51 additions and 48 deletions
|
@ -1,7 +1,7 @@
|
|||
module ActionCable
|
||||
module Channel
|
||||
autoload :Callbacks, 'action_cable/channel/callbacks'
|
||||
autoload :Redis, 'action_cable/channel/redis'
|
||||
autoload :Streams, 'action_cable/channel/streams'
|
||||
autoload :Base, 'action_cable/channel/base'
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,7 +2,7 @@ module ActionCable
|
|||
module Channel
|
||||
class Base
|
||||
include Callbacks
|
||||
include Redis
|
||||
include Streams
|
||||
|
||||
on_subscribe :start_periodic_timers
|
||||
on_unsubscribe :stop_periodic_timers
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
module ActionCable
|
||||
module Channel
|
||||
module Redis
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
on_unsubscribe :unsubscribe_from_all_channels
|
||||
delegate :pubsub, to: :connection
|
||||
end
|
||||
|
||||
def subscribe_to(redis_channel, callback = nil)
|
||||
callback ||= default_subscription_callback(redis_channel)
|
||||
@_redis_channels ||= []
|
||||
@_redis_channels << [ redis_channel, callback ]
|
||||
|
||||
pubsub.subscribe(redis_channel, &callback)
|
||||
logger.info "#{channel_name} subscribed to broadcasts from #{redis_channel}"
|
||||
end
|
||||
|
||||
def unsubscribe_from_all_channels
|
||||
if @_redis_channels
|
||||
@_redis_channels.each do |redis_channel, callback|
|
||||
pubsub.unsubscribe_proc(redis_channel, callback)
|
||||
logger.info "#{channel_name} unsubscribed to broadcasts from #{redis_channel}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
protected
|
||||
def default_subscription_callback(channel)
|
||||
-> (message) do
|
||||
transmit ActiveSupport::JSON.decode(message), via: "broadcast from #{channel}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
40
lib/action_cable/channel/streams.rb
Normal file
40
lib/action_cable/channel/streams.rb
Normal file
|
@ -0,0 +1,40 @@
|
|||
module ActionCable
|
||||
module Channel
|
||||
module Streams
|
||||
extend ActiveSupport::Concern
|
||||
|
||||
included do
|
||||
on_unsubscribe :stop_all_streams
|
||||
end
|
||||
|
||||
def stream_from(broadcasting, callback = nil)
|
||||
callback ||= default_stream_callback(broadcasting)
|
||||
|
||||
streams << [ broadcasting, callback ]
|
||||
pubsub.subscribe broadcasting, &callback
|
||||
|
||||
logger.info "#{channel_name} is streaming from #{broadcasting}"
|
||||
end
|
||||
|
||||
def stop_all_streams
|
||||
streams.each do |broadcasting, callback|
|
||||
pubsub.unsubscribe_proc broadcasting, callback
|
||||
logger.info "#{channel_name} stopped streaming from #{broadcasting}"
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
delegate :pubsub, to: :connection
|
||||
|
||||
def streams
|
||||
@_streams ||= []
|
||||
end
|
||||
|
||||
def default_stream_callback(broadcasting)
|
||||
-> (message) do
|
||||
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,12 +1,12 @@
|
|||
module ActionCable
|
||||
module Server
|
||||
module Broadcasting
|
||||
def broadcast(channel, message)
|
||||
broadcaster_for(channel).broadcast(message)
|
||||
def broadcast(broadcasting, message)
|
||||
broadcaster_for(broadcasting).broadcast(message)
|
||||
end
|
||||
|
||||
def broadcaster_for(channel)
|
||||
Broadcaster.new(self, channel)
|
||||
def broadcaster_for(broadcasting)
|
||||
Broadcaster.new(self, broadcasting)
|
||||
end
|
||||
|
||||
def broadcasting_redis
|
||||
|
@ -15,19 +15,19 @@ module ActionCable
|
|||
|
||||
private
|
||||
class Broadcaster
|
||||
attr_reader :server, :channel
|
||||
attr_reader :server, :broadcasting
|
||||
|
||||
def initialize(server, channel)
|
||||
@server, @channel = server, channel
|
||||
def initialize(server, broadcasting)
|
||||
@server, @broadcasting = server, broadcasting
|
||||
end
|
||||
|
||||
def broadcast(message)
|
||||
server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}"
|
||||
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
|
||||
broadcast_without_logging(message)
|
||||
end
|
||||
|
||||
def broadcast_without_logging(message)
|
||||
server.broadcasting_redis.publish channel, message.to_json
|
||||
server.broadcasting_redis.publish broadcasting, message.to_json
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue