From db568553d172e6ebdc3bc0aae3839d4a91299b42 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Wed, 14 Jan 2015 21:59:31 +0530 Subject: [PATCH] Action Cable take#1 --- Gemfile | 4 ++ README | 3 ++ action_cable.gemspec | 19 +++++++ lib/action_cable.rb | 12 +++++ lib/action_cable/channel.rb | 6 +++ lib/action_cable/channel/base.rb | 64 +++++++++++++++++++++++ lib/action_cable/channel/callbacks.rb | 32 ++++++++++++ lib/action_cable/server.rb | 73 +++++++++++++++++++++++++++ 8 files changed, 213 insertions(+) create mode 100644 Gemfile create mode 100644 README create mode 100644 action_cable.gemspec create mode 100644 lib/action_cable.rb create mode 100644 lib/action_cable/channel.rb create mode 100644 lib/action_cable/channel/base.rb create mode 100644 lib/action_cable/channel/callbacks.rb create mode 100644 lib/action_cable/server.rb diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000000..f4110035ed --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'http://rubygems.org' +gemspec + +gem 'cramp', github: "lifo/cramp" diff --git a/README b/README new file mode 100644 index 0000000000..4f350e625f --- /dev/null +++ b/README @@ -0,0 +1,3 @@ +# ActionCable + +Action Cable is a framework for realtime communication over websockets. \ No newline at end of file diff --git a/action_cable.gemspec b/action_cable.gemspec new file mode 100644 index 0000000000..63ba751e9d --- /dev/null +++ b/action_cable.gemspec @@ -0,0 +1,19 @@ +Gem::Specification.new do |s| + s.platform = Gem::Platform::RUBY + s.name = 'action_cable' + s.version = '0.0.1' + s.summary = 'Framework for websockets.' + s.description = 'Action Cable is a framework for realtime communication over websockets.' + + s.author = ['Pratik Naik'] + s.email = ['pratiknaik@gmail.com'] + s.homepage = 'http://basecamp.com' + + s.add_dependency('activesupport', '~> 4.2.0') + s.add_dependency('cramp', '~> 0.15.4') + + s.files = Dir['README', 'lib/**/*'] + s.has_rdoc = false + + s.require_path = 'lib' +end diff --git a/lib/action_cable.rb b/lib/action_cable.rb new file mode 100644 index 0000000000..7df2a8c5eb --- /dev/null +++ b/lib/action_cable.rb @@ -0,0 +1,12 @@ +require 'cramp' +require 'active_support' +require 'active_support/json' +require 'active_support/concern' +require 'active_support/core_ext/hash/indifferent_access' + +module ActionCable + VERSION = '0.0.1' + + autoload :Channel, 'action_cable/channel' + autoload :Server, 'action_cable/server' +end diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb new file mode 100644 index 0000000000..a54302d30f --- /dev/null +++ b/lib/action_cable/channel.rb @@ -0,0 +1,6 @@ +module ActionCable + module Channel + autoload :Callbacks, 'action_cable/channel/callbacks' + autoload :Base, 'action_cable/channel/base' + end +end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb new file mode 100644 index 0000000000..82c1a14b49 --- /dev/null +++ b/lib/action_cable/channel/base.rb @@ -0,0 +1,64 @@ +module ActionCable + module Channel + + class Base + include Callbacks + + on_subscribe :start_periodic_timers + on_unsubscribe :stop_periodic_timers + + attr_reader :params + + class << self + def matches?(identifier) + raise "Please implement #{name}#matches? method" + end + end + + def initialize(connection, channel_identifier, params = {}) + @connection = connection + @channel_identifier = channel_identifier + @_active_periodic_timers = [] + @params = params + + setup + end + + def receive(data) + raise "Not implemented" + end + + def subscribe + self.class.on_subscribe_callbacks.each do |callback| + EM.next_tick { send(callback) } + end + end + + def unsubscribe + self.class.on_unsubscribe.each do |callback| + EM.next_tick { send(callback) } + end + end + + protected + def setup + # Override in subclasses + end + + def publish(data) + @connection.publish(data.merge(identifier: @channel_identifier).to_json) + end + + def start_periodic_timers + self.class.periodic_timers.each do |method, options| + @_active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) { send(method) } + end + end + + def stop_periodic_timers + @_active_periodic_timers.each {|t| t.cancel } + end + end + + end +end \ No newline at end of file diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb new file mode 100644 index 0000000000..cf0246a386 --- /dev/null +++ b/lib/action_cable/channel/callbacks.rb @@ -0,0 +1,32 @@ +module ActionCable + module Channel + + module Callbacks + extend ActiveSupport::Concern + + included do + class_attribute :on_subscribe_callbacks, :on_unsubscribe_callbacks, :periodic_timers, :instance_reader => false + + self.on_subscribe_callbacks = [] + self.on_unsubscribe_callbacks = [] + self.periodic_timers = [] + end + + module ClassMethods + def on_subscribe(*methods) + self.on_subscribe_callbacks += methods + end + + def on_unsubscribe(*methods) + self.on_unsubscribe_callbacks += methods + end + + def periodic_timer(method, every:) + self.periodic_timers += [ [ method, every: every ] ] + end + end + + end + + end +end \ No newline at end of file diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb new file mode 100644 index 0000000000..2d80e96265 --- /dev/null +++ b/lib/action_cable/server.rb @@ -0,0 +1,73 @@ +require 'set' + +module ActionCable + class Server < Cramp::Websocket + on_start :initialize_subscriptions + on_data :received_data + on_finish :cleanup_subscriptions + + class_attribute :registered_channels + self.registered_channels = Set.new + + class << self + def register_channels(*channel_classes) + registered_channels.merge(channel_classes) + end + end + + def initialize_subscriptions + @subscriptions = {} + end + + def received_data(data) + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def publish(data) + render data + end + + private + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + if subscription = registered_channels.detect { |channel_klass| channel_klass.matches?(id_options) } + @subscriptions[id_key] = subscription.new(self, id_key, id_options) + @subscriptions[id_key].subscribe + else + # No channel found + end + end + + def process_message(message) + id_key = message['identifier'] + + if @subscriptions[id_key] + @subscriptions[id_key].receive(ActiveSupport::JSON.decode message['data']) + end + end + + def unsubscribe_channel(data) + id_key = data['identifier'] + @subscriptions[id_key].unsubscribe + @subscriptions.delete(id_key) + end + + end +end