Usage examples of new Futures

This commit is contained in:
Petr Chalupa 2015-05-18 21:21:13 +02:00
parent 4519f945cf
commit 0f88dc110c
5 changed files with 446 additions and 16 deletions

View File

@ -7,6 +7,29 @@ time = 10 * scale
warmup = 2 * scale
warmup *= 10 if Concurrent.on_jruby?
Benchmark.ips(time, warmup) do |x|
x.report('graph-old') do
head = Concurrent::Promise.execute { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
Concurrent::Promise.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
end
x.report('graph-new') do
head = Concurrent.future { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
branch3 = head.then(&:succ).then(&:succ).then(&:succ)
Concurrent.zip(branch1, branch2, branch3).then { |(a, b, c)| a + b + c }.value!
end
x.compare!
end
Benchmark.ips(time, warmup) do |x|
x.report('status-old') { f = Concurrent::Promise.execute { nil }; 10.times { f.complete? } }
x.report('status-new') { f = Concurrent.future { nil }; 10.times { f.completed? } }
x.compare!
end
Benchmark.ips(time, warmup) do |x|
of = Concurrent::Promise.execute { 1 }
@ -16,22 +39,6 @@ Benchmark.ips(time, warmup) do |x|
x.compare!
end
Benchmark.ips(time, warmup) do |x|
x.report('graph-old') do
head = Concurrent::Promise.execute { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
Concurrent::Promise.zip(branch1, branch2).then { |(a, b)| a + b }.value!
end
x.report('graph-new') do
head = Concurrent.future { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
(branch1 + branch2).then { |(a, b)| a + b }.value!
end
x.compare!
end
Benchmark.ips(time, warmup) do |x|
x.report('immediate-old') { Concurrent::Promise.execute { nil }.value! }
x.report('immediate-new') { Concurrent.future { nil }.value! }

158
examples/edge_futures.in.rb Normal file
View File

@ -0,0 +1,158 @@
### Simple asynchronous task
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
future.completed?
# block until evaluated
future.value
future.completed?
### Failing asynchronous task
future = Concurrent.future { raise 'Boom' }
future.value
future.value! rescue $!
future.reason
# re-raising
raise future rescue $!
### Chaining
head = Concurrent.future { 1 } #
branch1 = head.then(&:succ) #
branch2 = head.then(&:succ).then(&:succ) #
branch1.zip(branch2).value
(branch1 & branch2).then { |(a, b)| a + b }.value
# pick only first completed
(branch1 | branch2).value
### Error handling
Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
### Delay
# will not evaluate until asked by #value or other method requiring completion
scheduledfuture = Concurrent.delay { 'lazy' }
sleep 0.1 #
future.completed?
future.value
# propagates trough chain allowing whole or partial lazy chains
head = Concurrent.delay { 1 }
branch1 = head.then(&:succ)
branch2 = head.delay.then(&:succ)
join = branch1 & branch2
sleep 0.1 # nothing will complete
[head, branch1, branch2, join].map(&:completed?)
branch1.value
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
[head, branch1, branch2, join].map(&:completed?)
join.value
### Flatting
Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future
# more complicated example
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
flat(1).
then { |f| f.then(&:succ) }.
flat(1).value
### Schedule
scheduled = Concurrent.schedule(0.1) { 1 }
scheduled.completed?
scheduled.value # available after 0.1sec
# and in chain
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
# will not be scheduled until value is requested
sleep 0.1 #
scheduled.value # returns after another 0.1sec
### Completable Future and Event
future = Concurrent.future
event = Concurrent.event
# will be blocked until completed
t1 = Thread.new { future.value } #
t2 = Thread.new { event.wait } #
future.success 1
future.success 1 rescue $!
future.try_success 2
event.complete
[t1, t2].each &:join #
### Callbacks
queue = Queue.new
future = Concurrent.delay { 1 + 1 }
future.on_success { queue << 1 } # evaluated asynchronously
future.on_success! { queue << 2 } # evaluated on completing thread
queue.empty?
future.value
queue.pop
queue.pop
### Thread-pools
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
### Interoperability with actors
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
-> v { v ** 2 }
end
Concurrent.
future { 2 }.
then_ask(actor).
then { |v| v + 2 }.
value
actor.ask(2).then(&:succ).value
### Common use-cases Examples
# simple background processing
Concurrent.future { do_stuff }
# parallel background processing
jobs = 10.times.map { |i| Concurrent.future { i } } #
Concurrent.zip(*jobs).value
# periodic task
def schedule_job
Concurrent.schedule(1) { do_stuff }.
rescue { |e| report_error e }.
then { schedule_job }
end
schedule_job

View File

@ -0,0 +1,185 @@
### Simple asynchronous task
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
# => <#Concurrent::Edge::Future:0x7fa08385da60 pending blocks:[]>
future.completed? # => false
# block until evaluated
future.value # => 2
future.completed? # => true
### Failing asynchronous task
future = Concurrent.future { raise 'Boom' }
# => <#Concurrent::Edge::Future:0x7fa083834638 failed blocks:[]>
future.value # => nil
future.value! rescue $! # => #<RuntimeError: Boom>
future.reason # => #<RuntimeError: Boom>
# re-raising
raise future rescue $! # => #<RuntimeError: Boom>
### Chaining
head = Concurrent.future { 1 }
branch1 = head.then(&:succ)
branch2 = head.then(&:succ).then(&:succ)
branch1.zip(branch2).value # => [2, 3]
(branch1 & branch2).then { |(a, b)| a + b }.value
# => 5
# pick only first completed
(branch1 | branch2).value # => 2
### Error handling
Concurrent.future { Object.new }.then(&:succ).then(&:succ).rescue { |e| e.class }.value # error propagates
# => NoMethodError
Concurrent.future { Object.new }.then(&:succ).rescue { 1 }.then(&:succ).value
# => 2
Concurrent.future { 1 }.then(&:succ).rescue { |e| e.message }.then(&:succ).value
# => 3
### Delay
# will not evaluate until asked by #value or other method requiring completion
scheduledfuture = Concurrent.delay { 'lazy' }
# => <#Concurrent::Edge::Future:0x7fa0831917b8 pending blocks:[]>
sleep 0.1
future.completed? # => true
future.value # => nil
# propagates trough chain allowing whole or partial lazy chains
head = Concurrent.delay { 1 }
# => <#Concurrent::Edge::Future:0x7fa083172ef8 pending blocks:[]>
branch1 = head.then(&:succ)
# => <#Concurrent::Edge::Future:0x7fa083171c88 pending blocks:[]>
branch2 = head.delay.then(&:succ)
# => <#Concurrent::Edge::Future:0x7fa08294f528 pending blocks:[]>
join = branch1 & branch2
# => <#Concurrent::Edge::Future:0x7fa08294e218 pending blocks:[]>
sleep 0.1 # nothing will complete # => 0
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
branch1.value # => 2
sleep 0.1 # forces only head to complete, branch 2 stays incomplete
# => 0
[head, branch1, branch2, join].map(&:completed?) # => [true, true, false, false]
join.value # => [2, 2]
### Flatting
Concurrent.future { Concurrent.future { 1+1 } }.flat.value # waits for inner future
# => 2
# more complicated example
Concurrent.future { Concurrent.future { Concurrent.future { 1 + 1 } } }.
flat(1).
then { |f| f.then(&:succ) }.
flat(1).value # => 3
### Schedule
scheduled = Concurrent.schedule(0.1) { 1 }
# => <#Concurrent::Edge::Future:0x7fa08224edf0 pending blocks:[]>
scheduled.completed? # => false
scheduled.value # available after 0.1sec # => 1
# and in chain
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
# => <#Concurrent::Edge::Future:0x7fa0831f3d50 pending blocks:[]>
# will not be scheduled until value is requested
sleep 0.1
scheduled.value # returns after another 0.1sec # => 2
### Completable Future and Event
future = Concurrent.future
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 pending blocks:[]>
event = Concurrent.event
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 pending blocks:[]>
# will be blocked until completed
t1 = Thread.new { future.value }
t2 = Thread.new { event.wait }
future.success 1
# => <#Concurrent::Edge::CompletableFuture:0x7fa0831e8090 success blocks:[]>
future.success 1 rescue $!
# => #<Concurrent::MultipleAssignmentError: multiple assignment>
future.try_success 2 # => false
event.complete
# => <#Concurrent::Edge::CompletableEvent:0x7fa0831dae68 completed blocks:[]>
[t1, t2].each &:join
### Callbacks
queue = Queue.new # => #<Thread::Queue:0x007fa0831bac30>
future = Concurrent.delay { 1 + 1 }
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
future.on_success { queue << 1 } # evaluated asynchronously
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
future.on_success! { queue << 2 } # evaluated on completing thread
# => <#Concurrent::Edge::Future:0x7fa0831b96c8 pending blocks:[]>
queue.empty? # => true
future.value # => 2
queue.pop # => 2
queue.pop # => 1
### Thread-pools
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
# => <#Concurrent::Edge::Future:0x7fa08318b070 success blocks:[]>
### Interoperability with actors
actor = Concurrent::Actor::Utils::AdHoc.spawn :square do
-> v { v ** 2 }
end
# => #<Concurrent::Actor::Reference /square (Concurrent::Actor::Utils::AdHoc)>
Concurrent.
future { 2 }.
then_ask(actor).
then { |v| v + 2 }.
value # => 6
actor.ask(2).then(&:succ).value # => 5
### Common use-cases Examples
# simple background processing
Concurrent.future { do_stuff }
# => <#Concurrent::Edge::Future:0x7fa0839ee8e8 pending blocks:[]>
# parallel background processing
jobs = 10.times.map { |i| Concurrent.future { i } }
Concurrent.zip(*jobs).value # => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# periodic task
def schedule_job
Concurrent.schedule(1) { do_stuff }.
rescue { |e| report_error e }.
then { schedule_job }
end # => :schedule_job
schedule_job
# => <#Concurrent::Edge::Future:0x7fa082904f78 pending blocks:[]>

75
examples/format.rb Normal file
View File

@ -0,0 +1,75 @@
require 'rubygems'
require 'bundler/setup'
require 'pry'
require 'pp'
input_paths = if ARGV.empty?
Dir.glob("#{File.dirname(__FILE__)}/*.in.rb")
else
ARGV
end.map { |p| File.expand_path p }
input_paths.each_with_index do |input_path, i|
pid = fork do
require_relative 'init.rb'
begin
output_path = input_path.gsub /\.in\.rb$/, '.out.rb'
input = File.readlines(input_path)
chunks = []
line = ''
while !input.empty?
line += input.shift
if Pry::Code.complete_expression? line
chunks << line
line = ''
end
end
raise unless line.empty?
chunks.map! { |chunk| [chunk, [chunk.split($/).size, 1].max] }
environment = Module.new.send :binding
evaluate = ->(code, line) do
eval(code, environment, input_path, line)
end
indent = 50
line_count = 1
output = ''
chunks.each do |chunk, lines|
result = evaluate.(chunk, line_count)
unless chunk.strip.empty? || chunk =~ /\A *#/
pre_lines = chunk.lines.to_a
last_line = pre_lines.pop
output << pre_lines.join
if last_line =~ /\#$/
output << last_line.gsub(/\#$/, '')
else
if last_line.size < indent && result.inspect.size < indent
output << "%-#{indent}s %s" % [last_line.chomp, "# => #{result.inspect}\n"]
else
output << last_line << " # => #{result.inspect}\n"
end
end
else
output << chunk
end
line_count += lines
end
puts "#{input_path}\n -> #{output_path}"
#puts output
File.write(output_path, output)
rescue => ex
puts "#{ex} (#{ex.class})\n#{ex.backtrace * "\n"}"
end
end
Process.wait pid
end

5
examples/init.rb Normal file
View File

@ -0,0 +1,5 @@
require 'concurrent-edge'
def do_stuff
:stuff
end