274 lines
9.5 KiB
Ruby
274 lines
9.5 KiB
Ruby
require 'concurrent-edge'
|
|
|
|
# This little bit more complicated commented example aims to
|
|
# demonstrate some of the capabilities of concurrent-ruby new abstractions.
|
|
#
|
|
# It is a concurrent processing pipeline which on one side has several web crawlers.
|
|
# They are searching the web for data and filling buffer.
|
|
# On the other side there are data processors which are pop the data from buffer.
|
|
# They are processing the data and storing results into a DB
|
|
# which has limited concurrency level.
|
|
# Some of the parts like Web and DB are just stubs.
|
|
# Each part logs and increments counters to keep some stats about the pipeline.
|
|
# There is also a periodical readout of the stats into log scheduled.
|
|
#
|
|
# Schema of the pipeline:
|
|
#
|
|
# web-crawlers -> buffer -> data-processing -> DB
|
|
# \____________________________\_____\___> logging
|
|
|
|
# TODO (pitr-ch 10-Mar-2019): replace with a better more realistic example using
|
|
# * actors for limited concurrency with state - local DB connection
|
|
# * throttled futures for REST API - limiting server load
|
|
|
|
# The central logger is defined first.
|
|
# It has state like the logger instance, therefore the actor is used.
|
|
# It is better to exactly define the communication protocol of the logging actor.
|
|
# It will only understand these messages.
|
|
Log = Concurrent::ImmutableStruct.new :severity, :message
|
|
SetLevel = Concurrent::ImmutableStruct.new :level
|
|
|
|
require 'logger'
|
|
require 'stringio'
|
|
|
|
# Including actor constants so this scope understands ANY etc.
|
|
include Concurrent::ErlangActor::EnvironmentConstants
|
|
# The logger does not need a dedicated thread, let's use a pool.
|
|
LOGGING = Concurrent::ErlangActor.spawn Logger::FATAL,
|
|
type: :on_pool,
|
|
name: 'logger' do |level|
|
|
# a Logger instance with nicer formatting is created
|
|
@logger = Logger.new($captured_out)
|
|
@logger.level = level
|
|
@logger.formatter = lambda do |severity, datetime, progname, msg|
|
|
formatted_message = case msg
|
|
when String
|
|
msg
|
|
when Exception
|
|
format "%s (%s)\n%s",
|
|
msg.message, msg.class, (msg.backtrace || []).join("\n")
|
|
else
|
|
msg.inspect
|
|
end
|
|
format "[%s] %5s -- %s: %s\n",
|
|
datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
|
|
severity,
|
|
progname,
|
|
formatted_message
|
|
end
|
|
|
|
# definition of the logging actor behaviour
|
|
receive(
|
|
# log messages
|
|
on(Log) { |message| @logger.log message.severity, message.message },
|
|
# change level
|
|
on(SetLevel) { |message| @logger.level = message.level },
|
|
# It is a good practice to read and log bad messages,
|
|
# otherwise they would accumulate in the inbox.
|
|
on(ANY) { |message| @logger.error bad_message: message },
|
|
# The logger has static behaviour, therefore keep can be used, and the actor
|
|
# will behave the same with each message received as defined below.
|
|
keep: true)
|
|
end
|
|
|
|
# testing the logger works as expected
|
|
LOGGING.tell Log[Logger::FATAL, :tornado]
|
|
LOGGING.tell Log[Logger::INFO, :wind]
|
|
LOGGING.tell SetLevel[Logger::DEBUG]
|
|
LOGGING.tell Log[Logger::INFO, :breeze]
|
|
|
|
sleep 0.05 # the logging is asynchronous, we need to wait a bit until it's written
|
|
get_captured_output
|
|
|
|
# the logging could be wrapped in a method
|
|
def log(severity, message)
|
|
LOGGING.tell Log[severity, message]
|
|
true
|
|
end
|
|
|
|
include Logger::Severity
|
|
log INFO, 'alive'
|
|
sleep 0.05
|
|
get_captured_output
|
|
|
|
|
|
# The stub which will represent the web
|
|
module Web
|
|
@counter = Concurrent::AtomicFixnum.new
|
|
|
|
def self.search
|
|
sleep 0.01
|
|
@counter.increment.to_s(16)
|
|
end
|
|
end #
|
|
|
|
# The cancellation which will be used to cancel the whole processing pipeline.
|
|
@cancellation, origin = Concurrent::Cancellation.new
|
|
|
|
# Buffer for work
|
|
buffer_capacity = 10
|
|
@buffer = Concurrent::Promises::Channel.new buffer_capacity
|
|
web_crawler_count = 4
|
|
|
|
# Track the number of data provided by each crawler
|
|
crawler_data_counter = Array.new(web_crawler_count) do |i|
|
|
# this is accessed by multiple threads so it should be a tread-safe counter
|
|
Concurrent::AtomicFixnum.new
|
|
end #
|
|
# the array is frozen which makes it immutable,
|
|
# therefore safe to use when concurrently accessed.
|
|
# Otherwise if it was being modified it wound has to be Concurrent::Array to make it safe.
|
|
crawler_data_counter.freeze
|
|
|
|
# The web crawlers are defined directly with threads to start the example simply.
|
|
# They search the web and immediately as they find something they push
|
|
# the data into the buffer.
|
|
# The push will block if the buffer is full,
|
|
# regulating how fast is the work being found.
|
|
# This is called backpressure.
|
|
crawlers = Array.new web_crawler_count do |i|
|
|
Thread.new do
|
|
while true
|
|
# crawl the web until cancelled
|
|
break if @cancellation.canceled?
|
|
# will block and slow down the crawler if the buffer is full
|
|
data = Web.search
|
|
until @buffer.push data, 0.1
|
|
# It is a good practice to use timeouts on all blocking operations
|
|
# If the pipeline is cancelled and the data-processors finish
|
|
# before taking data from buffer a crawler could get stack on this push.
|
|
break if @cancellation.canceled?
|
|
end
|
|
# it pushed data, increment its counter
|
|
crawler_data_counter[i].increment
|
|
log DEBUG, "crawler #{i} found #{data}"
|
|
end
|
|
end
|
|
end.freeze
|
|
|
|
# So far only the crawlers looking for data are defined
|
|
# pushing data into the buffer.
|
|
# The data processing definition follows.
|
|
# Threads are not used again directly but rather the data processing
|
|
# is defined using Futures.
|
|
# Even though that makes the definition more complicated
|
|
# it has a big advantage that data processors will not require a Thread each
|
|
# but they will share and run on a Thread pool.
|
|
# That removes an important limitation of the total number of threads process can have,
|
|
# which can be an issue in larger systems.
|
|
# This example would be fine with using the Threads
|
|
# however it would not demonstrate the more advanced usage then.
|
|
|
|
# The data processing stores results in a DB,
|
|
# therefore the stub definition of a database precedes the data processing.
|
|
module DB
|
|
@data = Concurrent::Map.new
|
|
|
|
# increment a counter for char
|
|
def self.add(char, count)
|
|
@data.compute char do |old|
|
|
(old || 0) + count
|
|
end
|
|
true
|
|
end
|
|
|
|
# return the stored data as Hash
|
|
def self.data
|
|
@data.each_pair.reduce({}) { |h, (k, v)| h.update k => v }
|
|
end
|
|
end
|
|
|
|
# Lets assume that instead having this DB
|
|
# we have limited number of connections
|
|
# and therefore there is a limit on
|
|
# how many threads can communicate with the DB at the same time.
|
|
# The throttle is created to limit the number of concurrent access to DB.
|
|
@db_throttle = Concurrent::Throttle.new 4
|
|
|
|
# The data processing definition follows
|
|
data_processing_count = 20 # this could actually be thousands if required
|
|
|
|
# track the number of data received by data processors
|
|
@data_processing_counters = Array.new data_processing_count do
|
|
Concurrent::AtomicFixnum.new
|
|
end.freeze
|
|
|
|
def data_processing(i)
|
|
# pop_op returns a future which is fulfilled with a message from buffer
|
|
# when a message is valuable.
|
|
@buffer.pop_op.then_on(:fast) do |data|
|
|
# then we process the message on :fast pool since this has no blocking
|
|
log DEBUG, "data-processor #{i} got #{data}"
|
|
@data_processing_counters[i].increment
|
|
sleep 0.1 # simulate it actually doing something which take some time
|
|
# find the most frequent char
|
|
data.chars.
|
|
group_by { |v| v }.
|
|
map { |ch, arr| [ch, arr.size] }.
|
|
max_by { |ch, size| size }
|
|
end.then_on(@db_throttle.on(:io)) do |char, count|
|
|
# the db access has to be limited therefore the db_throttle is used
|
|
# DBs use io therefore this part is executed on global thread pool wor :io
|
|
DB.add char, count
|
|
end.then_on(:fast) do |_|
|
|
# last section executes back on :fast executor
|
|
# checks if it was cancelled
|
|
# if not then it calls itself recursively
|
|
# which in combination with #run will turn this into infinite data processing
|
|
# (until cancelled)
|
|
# The #run will keep flatting to the inner future as long the value is a future.
|
|
if @cancellation.canceled?
|
|
# return something else then future, #run will stop executing
|
|
:done
|
|
else
|
|
# continue running with a future returned by data_processing
|
|
data_processing i
|
|
end
|
|
end
|
|
end #
|
|
|
|
# create the data processors
|
|
data_processors = Array.new data_processing_count do |i|
|
|
data_processing(i).run
|
|
end
|
|
|
|
# Some statics are collected in crawler_data_counter
|
|
# and @data_processing_counters.
|
|
# Schedule a periodical readout to a log.
|
|
def readout(crawler_data_counter)
|
|
# schedule readout in 0.4 sec or on cancellation
|
|
(@cancellation.origin | Concurrent::Promises.schedule(0.4)).then do
|
|
log INFO,
|
|
"\ncrawlers found: #{crawler_data_counter.map(&:value).join(', ')}\n" +
|
|
"data processors consumed: #{@data_processing_counters.map(&:value).join(', ')}"
|
|
end.then do
|
|
# reschedule if not cancelled
|
|
readout crawler_data_counter unless @cancellation.canceled?
|
|
end
|
|
end
|
|
|
|
# start the periodical readouts
|
|
readouts = readout(crawler_data_counter).run
|
|
|
|
sleep 2 # let the whole processing pipeline work
|
|
# cancel everything
|
|
origin.resolve
|
|
|
|
# wait for everything to stop
|
|
crawlers.each(&:join)
|
|
data_processors.each(&:wait!)[0..10]
|
|
readouts.wait!
|
|
|
|
# terminate the logger
|
|
Concurrent::ErlangActor.terminate LOGGING, :cancelled
|
|
LOGGING.terminated.wait
|
|
|
|
# inspect collected char frequencies
|
|
DB.data
|
|
|
|
# see the logger output
|
|
get_captured_output
|
|
|
|
|
|
|