Channel updates, documentation, and tests.

This commit is contained in:
Jerry D'Antonio 2015-09-30 22:21:37 -04:00
parent 6a76967058
commit 09e482d507
44 changed files with 1135 additions and 359 deletions

View File

@ -57,7 +57,7 @@ msg = messages.take
puts msg
```
By default, channels are *unbuffered*, meaning that they have a size of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
By default, channels are *unbuffered*, meaning that they have a capacity of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
The following, slightly more complex example, concurrently sums two different halves of a list then combines the results. It uses an unbuffered channel to pass the results from the two goroutines back to the main thread. The main thread blocks on the two `take` calls until the worker goroutines are done. This example also uses the convenience aliases {#<<} and {#~}. Since channels in Go are part of the language, channel operations are performed using special channel operators rather than functions. These operators help clearly indicate that channel operations are being performed. The operator overloads `<<` for `put` and `~` for `take` help reinforce this idea in Ruby.
@ -80,12 +80,12 @@ puts [x, y, x+y].join(' ')
## Channel Buffering
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:size` option on channel creation:
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:capacity` option on channel creation:
The following example creates a buffered channel with two slots. It then makes two `put` calls, adding values to the channel. These calls do not block because the buffer has room. Were a third `put` call to be made before an `take` calls, the third `put` would block.
```ruby
ch = Concurrent::Channel.new(size: 2)
ch = Concurrent::Channel.new(capacity: 2)
ch << 1
ch << 2
@ -95,7 +95,7 @@ puts ~ch
## Channel Synchronization
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `size: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `capacity: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
```ruby
def worker(done_channel)
@ -106,7 +106,7 @@ def worker(done_channel)
done_channel << true
end
done = Concurrent::Channel.new(size: 1)
done = Concurrent::Channel.new(capacity: 1)
Concurrent::Channel.go{ worker(done) }
~done # block until signaled
@ -192,7 +192,7 @@ def fibonacci(n, c)
c.close
end
chan = Concurrent::Channel.new(size: 10)
chan = Concurrent::Channel.new(capacity: 10)
Concurrent::Channel.go { fibonacci(chan.capacity, c) }
chan.each { |i| puts i }
```

View File

@ -5,16 +5,15 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## A Tour of Go: Buffered Channels
# https://tour.golang.org/concurrency/3
# https://tour.golang.org/concurrency/3
ch = Channel.new(size: 2)
ch = Channel.new(capacity: 2)
ch << 1
ch << 2
puts ~ch
puts ~ch
expected = <<-STDOUT
__END__
1
2
STDOUT

View File

@ -5,7 +5,7 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## A Tour of Go: Channels
# https://tour.golang.org/concurrency/2
# https://tour.golang.org/concurrency/2
def sum(a, c)
sum = a.reduce(0, &:+)
@ -22,6 +22,5 @@ x, y = ~c, ~c # `~` is an alias for `take` or `receive`
puts [x, y, x+y].join(' ')
expected = <<-STDOUT
__END__
-5 17 12
STDOUT

View File

@ -24,7 +24,7 @@ loop do
end
end
expected = <<-STDOUT
__END__
.
.
tick.
@ -41,4 +41,3 @@ tick.
.
tick.
BOOM!
STDOUT

View File

@ -65,7 +65,6 @@ end
puts same(new_tree(1), new_tree(1))
puts same(new_tree(1), new_tree(2))
expected = <<-STDOUT
__END__
true
false
STDOUT

View File

@ -5,7 +5,7 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## A Tour of Go: Range and Close
# https://tour.golang.org/concurrency/4
# https://tour.golang.org/concurrency/4
def fibonacci(n, c)
x, y = 0, 1
@ -16,11 +16,11 @@ def fibonacci(n, c)
c.close
end
c = Channel.new(size: 10)
c = Channel.new(capacity: 10)
Channel.go { fibonacci(c.capacity, c) }
c.each { |i| puts i }
expected = <<-STDOUT
__END__
0
1
1
@ -31,4 +31,3 @@ expected = <<-STDOUT
13
21
34
STDOUT

View File

@ -30,7 +30,7 @@ end
fibonacci(c, quit)
expected = <<-STDOUT
__END__
0
1
1
@ -42,4 +42,3 @@ expected = <<-STDOUT
21
34
quit
STDOUT

View File

@ -7,7 +7,7 @@ Channel = Concurrent::Channel
## Go by Example: Channel Buffering
# https://gobyexample.com/channel-buffering
messages = Channel.new(size: 2) # buffered
messages = Channel.new(capacity: 2) # buffered
messages.put 'buffered'
messages.put 'channel'
@ -15,7 +15,6 @@ messages.put 'channel'
puts messages.take
puts messages.take
expected = <<-STDOUT
__END__
buffered
channel
STDOUT

View File

@ -19,14 +19,13 @@ def pong(pings, pongs)
pongs << msg
end
pings = Channel.new(size: 1) # buffered
pongs = Channel.new(size: 1) # buffered
pings = Channel.new(capacity: 1) # buffered
pongs = Channel.new(capacity: 1) # buffered
ping(pings, 'passed message')
pong(pings, pongs)
puts ~pongs
expected = <<-STDOUT
__END__
passed message
STDOUT

View File

@ -15,12 +15,11 @@ def worker(done_channel)
done_channel << true # alias for `#put`
end
done = Channel.new(size: 1) # buffered
done = Channel.new(capacity: 1) # buffered
Channel.go{ worker(done) }
~done # alias for `#take`
expected = <<-STDOUT
__END__
working...
done
STDOUT

View File

@ -5,7 +5,7 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## Go by Example: Unbuffered Channel
# https://gobyexample.com/channels
# https://gobyexample.com/channels
messages = Channel.new # unbuffered
@ -16,6 +16,5 @@ end
msg = messages.take
puts msg
expected = <<-STDOUT
__END__
ping
STDOUT

View File

@ -5,10 +5,10 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## Go by Example: Closing Channels
# https://gobyexample.com/closing-channels
# https://gobyexample.com/closing-channels
validator = ->(v){ v.is_a? Numeric }
jobs = Channel.new(buffer: :buffered, size: 5,
jobs = Channel.new(buffer: :buffered, capacity: 5,
validator: validator)
done = Channel.new(buffer: :unbuffered)
@ -34,7 +34,7 @@ jobs.close
print "sent all jobs\n"
~done
expected = <<-STDOUT
__END__
sent job 1
received job 1
sent job 2
@ -43,4 +43,3 @@ sent job 3
received job 3
sent all jobs
received all jobs
STDOUT

View File

@ -5,7 +5,7 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## Go by Example: Non-Blocking Channel Operations
# https://gobyexample.com/non-blocking-channel-operations
# https://gobyexample.com/non-blocking-channel-operations
messages = Channel.new # unbuffered
signals = Channel.new # unbuffered
@ -27,8 +27,7 @@ Channel.select do |s|
s.default { print "no activity\n" }
end
expected = <<-STDOUT
__END__
no message received
no message sent
no activity
STDOUT

View File

@ -5,9 +5,9 @@ require 'concurrent-edge'
Channel = Concurrent::Channel
## Go by Example: Range over Channels
# https://gobyexample.com/range-over-channels
# https://gobyexample.com/range-over-channels
queue = Channel.new(size: 2) # buffered
queue = Channel.new(capacity: 2) # buffered
queue << 'one'
queue << 'two'
queue.close
@ -16,31 +16,6 @@ queue.each do |elem|
print "#{elem}\n"
end
expected = <<-STDOUT
__END__
one
two
STDOUT
def blocking_variant
queue = Channel.new(size: 2)
queue << 'one'
queue << 'two'
Channel.go do
sleep(1)
queue.close
end
queue.each do |elem|
print "#{elem}\n"
end
end
def sorting
count = 10
queue = Channel.new(size: count)
count.times { queue << rand(100) }
queue.close
puts queue.sort
end

View File

@ -9,7 +9,7 @@ Channel = Concurrent::Channel
## Go by Example: Rate Limiting
# https://gobyexample.com/tickers
requests = Channel.new(buffer: :buffered, size: 5)
requests = Channel.new(buffer: :buffered, capacity: 5)
(1..5).each do |i|
requests << i
end
@ -22,7 +22,7 @@ requests.each do |req|
end
print "\n"
bursty_limiter = Channel.new(buffer: :buffered, size: 3)
bursty_limiter = Channel.new(buffer: :buffered, capacity: 3)
(1..3).each do
bursty_limiter << Channel::Tick.new
end
@ -33,7 +33,7 @@ Channel.go do
end
end
bursty_requests = Channel.new(buffer: :buffered, size: 5)
bursty_requests = Channel.new(buffer: :buffered, capacity: 5)
(1..5).each do |i|
bursty_requests << i
end
@ -44,7 +44,7 @@ bursty_requests.each do |req|
print "request #{req} #{Channel::Tick.new}\n"
end
expected = <<-STDOUT
__END__
request 1 2012-10-19 00:38:18.687438 +0000 UTC
request 2 2012-10-19 00:38:18.887471 +0000 UTC
request 3 2012-10-19 00:38:19.087238 +0000 UTC
@ -56,4 +56,3 @@ request 2 2012-10-19 00:38:20.487645 +0000 UTC
request 3 2012-10-19 00:38:20.487676 +0000 UTC
request 4 2012-10-19 00:38:20.687483 +0000 UTC
request 5 2012-10-19 00:38:20.887542 +0000 UTC
STDOUT

View File

@ -27,7 +27,6 @@ end
end
end
expected = <<-STDOUT
__END__
received one
received two
STDOUT

View File

@ -18,9 +18,8 @@ sleep(1.6)
ticker.stop
print "Ticker stopped\n"
expected = <<-STDOUT
__END__
Tick at 2012-09-23 11:29:56.487625 -0700 PDT
Tick at 2012-09-23 11:29:56.988063 -0700 PDT
Tick at 2012-09-23 11:29:57.488076 -0700 PDT
Ticker stopped
STDOUT

View File

@ -7,7 +7,7 @@ Channel = Concurrent::Channel
## Go by Example: Timeouts
# https://gobyexample.com/timeouts
c1 = Channel.new(size: 1) # buffered
c1 = Channel.new(capacity: 1) # buffered
Channel.go do
sleep(2)
c1 << 'result 1'
@ -18,7 +18,7 @@ Channel.select do |s|
s.after(1) { print "timeout 1\n" }
end
c2 = Channel.new(size: 1) # buffered
c2 = Channel.new(capacity: 1) # buffered
Channel.go do
sleep(2)
c2 << 'result 2'
@ -29,7 +29,6 @@ Channel.select do |s|
s.after(3) { print "timeout 2\n" }
end
expected = <<-STDOUT
__END__
timeout 1
result 2
STDOUT

View File

@ -21,7 +21,6 @@ end
stop2 = timer2.stop
print "Timer 2 stopped\n" if stop2
expected = <<-STDOUT
__END__
Timer 1 expired
Timer 2 stopped
STDOUT

View File

@ -15,8 +15,8 @@ def worker(id, jobs, results)
end
end
jobs = Channel.new(buffer: :buffered, size: 100)
results = Channel.new(buffer: :buffered, size: 100)
jobs = Channel.new(buffer: :buffered, capacity: 100)
results = Channel.new(buffer: :buffered, capacity: 100)
(1..3).each do |w|
Channel.go { worker(w, jobs, results) }
@ -31,7 +31,7 @@ jobs.close
~results
end
expected = <<-STDOUT
__END__
worker 1 processing job 1
worker 2 processing job 2
worker 3 processing job 3
@ -41,4 +41,3 @@ worker 3 processing job 6
worker 1 processing job 7
worker 2 processing job 8
worker 3 processing job 9
STDOUT

28
examples/who.rb Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env ruby
require 'net/http'
require 'json'
# http://www.schneems.com/blogs/2015-09-30-reverse-rubygems/
gem_name = "concurrent-ruby"
def rubygems_get(gem_name: "", endpoint: "")
path = File.join("/api/v1/gems/", gem_name, endpoint).chomp("/") + ".json"
JSON.parse(Net::HTTP.get("rubygems.org", path))
end
results = rubygems_get(gem_name: gem_name, endpoint: "reverse_dependencies")
weighted_results = {}
results.each do |name|
begin
weighted_results[name] = rubygems_get(gem_name: name)["downloads"]
rescue => e
puts "#{name} #{e.message}"
end
end
weighted_results.sort {|(k1, v1), (k2, v2)| v2 <=> v1 }.first(50).each_with_index do |(k, v), i|
puts "#{i}) #{k}: #{v}"
end

View File

@ -48,22 +48,22 @@ module Concurrent
return
end
size = opts[:size]
capacity = opts[:capacity] || opts[:size]
buffer = opts[:buffer]
if size && buffer == :unbuffered
raise ArgumentError.new('unbuffered channels cannot have a size')
elsif size.nil? && buffer.nil?
if capacity && buffer == :unbuffered
raise ArgumentError.new('unbuffered channels cannot have a capacity')
elsif capacity.nil? && buffer.nil?
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif size == 0 && buffer == :buffered
elsif capacity == 0 && buffer == :buffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif buffer == :unbuffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif size.nil? || size < 1
raise ArgumentError.new('size must be at least 1 for this buffer type')
elsif capacity.nil? || capacity < 1
raise ArgumentError.new('capacity must be at least 1 for this buffer type')
else
buffer ||= :buffered
self.buffer = BUFFER_TYPES[buffer].new(size)
self.buffer = BUFFER_TYPES[buffer].new(capacity)
end
self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
@ -238,7 +238,13 @@ module Concurrent
private
attr_accessor :buffer, :validator
def validator() @validator; end
def validator=(value) @validator = value; end
def buffer() @buffer; end
def buffer=(value) @buffer = value; end
def validate(value, allow_nil, raise_error)
if !allow_nil && value.nil?

View File

@ -18,11 +18,6 @@ module Concurrent
# used as a channel buffer should extend this class.
class Base < Synchronization::LockableObject
# @!macro [attach] channel_buffer_size_reader
#
# The number of items currently in the buffer.
attr_reader :size
# @!macro [attach] channel_buffer_capacity_reader
#
# The maximum number of values which can be {#put} onto the buffer
@ -53,6 +48,13 @@ module Concurrent
true
end
# @!macro [attach] channel_buffer_size_reader
#
# The number of items currently in the buffer.
def size
synchronize { ns_size }
end
# @!macro [attach] channel_buffer_empty_question
#
# Predicate indicating if the buffer is empty.
@ -61,7 +63,7 @@ module Concurrent
#
# @raise [NotImplementedError] until overridden in a subclass.
def empty?
raise NotImplementedError
synchronize { ns_empty? }
end
# @!macro [attach] channel_buffer_full_question
@ -72,7 +74,7 @@ module Concurrent
#
# @raise [NotImplementedError] until overridden in a subclass.
def full?
raise NotImplementedError
synchronize { ns_full? }
end
# @!macro [attach] channel_buffer_put
@ -126,19 +128,16 @@ module Concurrent
# @!macro [attach] channel_buffer_next
#
# Take the next item from the buffer and also return a boolean
# indicating if subsequent items can be taken. Used for iterating
# Take the next "item" from the buffer and also return a boolean
# indicating if "more" items can be taken. Used for iterating
# over a buffer until it is closed and empty.
#
# If the buffer is open but no items remain the calling thread will
# block until an item is available. The second of the two return
# values, a boolean, will always be `true` when the buffer is open.
# When the buffer is closed but more items remain the second return
# value will also be `true`. When the buffer is closed and the last
# item is taken the second return value will be `false`. When the
# buffer is both closed and empty the first return value will be
# `NO_VALUE` and the second return value will be `false`.
# be `false` when the buffer is both closed and empty.
# values, "more" (a boolean), will always be `true` when the buffer is
# open. The "more" value will be `false` when the channel has been
# closed and all values have already been received. When "more" is
# false the returned item will be `NO_VALUE`.
#
# Note that when multiple threads access the same channel a race
# condition can occur when using this method. A call to `next` from
@ -194,12 +193,34 @@ module Concurrent
private
attr_accessor :buffer
attr_writer :closed, :capacity, :size
def buffer() @buffer; end
def buffer=(value) @buffer = value; end
def closed=(value) @closed = value; end
def capacity=(value) @capacity = value; end
def size=(value) @size = value; end
def ns_initialize(*args)
end
# @!macro channel_buffer_size_reader
def ns_size
raise NotImplementedError
end
# @!macro channel_buffer_empty_question
def ns_empty?
raise NotImplementedError
end
# @!macro channel_buffer_full_question
def ns_full?
raise NotImplementedError
end
# @!macro channel_buffer_closed_question
def ns_closed?
@closed

View File

@ -10,19 +10,6 @@ module Concurrent
# an item is removed from the buffer, creating spare capacity.
class Buffered < Base
# @!macro channel_buffer_empty_question
def empty?
synchronize { ns_empty? }
end
# @!macro channel_buffer_full_question
#
# Will return `true` once the number of items in the buffer reaches
# the {#size} value specified during initialization.
def full?
synchronize { ns_full? }
end
# @!macro channel_buffer_put
#
# New items can be put onto the buffer until the number of items in
@ -72,8 +59,7 @@ module Concurrent
return NO_VALUE, false
elsif !ns_empty?
item = buffer.shift
more = !ns_empty? || !ns_closed?
return item, more
return item, true
end
end
Thread.pass
@ -104,14 +90,19 @@ module Concurrent
self.buffer = []
end
# @!macro channel_buffer_size_reader
def ns_size
buffer.size
end
# @!macro channel_buffer_empty_question
def ns_empty?
buffer.length == 0
ns_size == 0
end
# @!macro channel_buffer_full_question
def ns_full?
buffer.length == capacity
ns_size == capacity
end
# @!macro channel_buffer_put

View File

@ -8,12 +8,6 @@ module Concurrent
class Ticker < Base
def size() 1; end
def empty?() false; end
def full?() true; end
def put(item)
false
end
@ -58,8 +52,15 @@ module Concurrent
def ns_initialize(interval)
@interval = interval.to_f
@next_tick = Concurrent.monotonic_time + interval
self.capacity = 1
end
def ns_size() 0; end
def ns_empty?() false; end
def ns_full?() true; end
def do_poll
if ns_closed?
return nil, false

View File

@ -8,16 +8,6 @@ module Concurrent
class Timer < Base
def size() 1; end
def empty?
synchronized { @empty }
end
def full?
!empty?
end
def put(item)
false
end
@ -27,13 +17,23 @@ module Concurrent
end
def take
self.next.first
loop do
result, tick = do_poll
if result == :closed
return NO_VALUE
elsif result == :tick
return tick
end
Thread.pass
end
end
def next
loop do
status, tick = do_poll
if status == :tick
if status == :closed
return NO_VALUE, false
elsif status == :tick
return tick, false
# AFAIK a Go timer will block forever if stopped
#elsif status == :closed
@ -52,16 +52,22 @@ module Concurrent
def ns_initialize(delay)
@tick = Concurrent.monotonic_time + delay.to_f
@closed = false
@empty = false
self.capacity = 1
end
def ns_size() 0; end
def ns_empty?() false; end
def ns_full?() true; end
def do_poll
synchronize do
return :closed, false if ns_closed?
if Concurrent.monotonic_time > @tick
if ns_closed?
return :closed, false
elsif Concurrent.monotonic_time > @tick
# only one listener gets notified
self.closed = true
return :tick, Concurrent::Channel::Tick.new(@tick)
else
return :wait, true

View File

@ -16,19 +16,21 @@ module Concurrent
class Unbuffered < Base
# @!macro channel_buffer_size_reader
#
# Always returns zero (0).
def size() 0; end
def size
synchronize do
putting.empty? ? 0 : 1
end
end
# @!macro channel_buffer_empty_question
#
# Always returns `true`.
def empty?() true; end
def empty?
size == 0
end
# @!macro channel_buffer_full_question
#
# Always returns `false`.
def full?() false; end
def full?
!empty?
end
# @!macro channel_buffer_put
#
@ -131,19 +133,21 @@ module Concurrent
# @see {#take}
def next
item = take
more = synchronize { !putting.empty? }
more = (item != NO_VALUE)
return item, more
end
private
attr_accessor :putting, :taking
def putting() @putting; end
def taking() @taking; end
# @!macro channel_buffer_initialize
def ns_initialize
# one will always be empty
self.putting = []
self.taking = []
@putting = []
@taking = []
self.closed = false
self.capacity = 1
end

View File

@ -54,6 +54,7 @@ module Concurrent
end
def execute
raise Channel::Error.new('no clauses given') if @clauses.empty?
loop do
done = @clauses.each do |clause|
result = clause.execute
@ -63,7 +64,11 @@ module Concurrent
Thread.pass
end
rescue => ex
@error_handler.call(ex) if @error_handler
if @error_handler
@error_handler.call(ex)
else
raise ex
end
end
end
end

View File

@ -304,7 +304,7 @@ module Concurrent
# @return [Promise] the new promise
def then(rescuer = nil, &block)
raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given?
block = Proc.new { |result| result } if block.nil?
block = Proc.new { |result| result } unless block_given?
child = Promise.new(
parent: self,
executor: @executor,

View File

@ -56,7 +56,7 @@ module Concurrent
end
describe 'reset' do
it 'should release all waiting threads' do
it 'should release all waiting threads', buggy: true do
start_latch = CountDownLatch.new(1)
continue_latch = CountDownLatch.new(1)

View File

@ -1,5 +1,9 @@
shared_examples :channel_buffer do
specify do
expect(subject).to respond_to(:blocking?)
end
context '#capacity' do
specify { expect(subject.capacity).to be >= 0 }
end

View File

@ -0,0 +1,74 @@
require_relative 'buffered_shared'
module Concurrent::Channel::Buffer
describe Base do
subject { described_class.new }
specify do
expect(subject.capacity).to eq 0
end
specify do
expect(subject).to be_blocking
end
specify do
expect {
subject.size
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.empty?
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.full?
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.put(42)
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.offer(42)
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.take
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.poll
}.to raise_error(NotImplementedError)
end
specify do
expect {
subject.next
}.to raise_error(NotImplementedError)
end
specify do
expect(subject).to_not be_closed
end
specify do
subject.close
expect(subject).to be_closed
end
end
end

View File

@ -4,8 +4,7 @@ shared_examples :channel_buffered_buffer do
it_behaves_like :channel_buffer
context 'initialization' do
context '#initialize' do
it 'raises an exception if size <= 0' do
expect {
described_class.new(0)
@ -14,15 +13,33 @@ shared_examples :channel_buffered_buffer do
end
context '#capacity' do
it 'returns the maximum capacity of the buffer' do
subject = described_class.new(10)
expect(subject.capacity).to eq 10
end
end
context '#empty?' do
context '#size' do
it 'is 0 when first created' do
expect(subject.size).to eq 0
end
it 'returns the number of items in the buffer' do
fill = subject.capacity / 2
fill.times { subject.put(:foo) }
expect(subject.size).to eq fill
end
it 'is 0 when there are taking threads but no putting threads' do
t = Thread.new { subject.take }
t.join(0.1)
expect(subject.size).to eq 0
t.kill # cleanup
end
end
context '#empty?' do
it 'returns true when empty' do
subject = described_class.new(10)
expect(subject).to be_empty
@ -42,7 +59,6 @@ shared_examples :channel_buffered_buffer do
end
context '#offer' do
it 'enqueues the item immediately when not full and not closed' do
subject.offer(:foo)
expect(subject.take).to eq :foo
@ -128,19 +144,32 @@ shared_examples :channel_buffered_buffer do
expect(more3).to be true
end
it 'returns <item> false when closed and last item' do
subject.offer(:foo)
subject.offer(:bar)
subject.offer(:baz)
it 'returns <item>, true when closed and last item' do
capacity = subject.capacity
expect(capacity).to be >= 1
capacity.times { subject.put(:foo) }
subject.close
_, more1 = subject.next
_, more2 = subject.next
_, more3 = subject.next
capacity.times do
item, more = subject.next
expect(item).to eq :foo
expect(more).to be true
end
end
expect(more1).to be true
expect(more2).to be true
expect(more3).to be false
it 'returns NO_VALUE, false when closed and no items remain' do
capacity = subject.capacity
expect(capacity).to be >= 1
capacity.times { subject.put(:foo) }
subject.close
capacity.times { subject.next }
item, more = subject.next
expect(item).to eq Concurrent::Channel::Buffer::NO_VALUE
expect(more).to be false
end
end
end

View File

@ -4,22 +4,47 @@ module Concurrent::Channel::Buffer
describe Buffered do
specify { expect(subject).to be_blocking }
let(:capacity) { 10 }
subject { described_class.new(capacity) }
subject { described_class.new(10) }
it_behaves_like :channel_buffered_buffer
context '#full?' do
specify do
expect(subject).to be_blocking
end
it 'returns true when full' do
context '#full?' do
it 'returns true when at max capacity' do
subject = described_class.new(1)
subject.put(:foo)
expect(subject).to be_full
end
end
context '#offer' do
context '#put' do
it 'blocks when at capacity until a thread is ready to take' do
subject = described_class.new(1)
subject.put(13)
bucket = Concurrent::AtomicReference.new(nil)
t = Thread.new do
subject.put(42)
bucket.value = 42
end
t.join(0.1)
before = bucket.value
subject.take
t.join(0.1)
after = bucket.value
expect(before).to be nil
expect(after).to eq 42
expect(t.status).to be false
end
end
context '#offer' do
it 'returns false immediately when full' do
subject = described_class.new(1)
subject.put(:foo)

View File

@ -4,11 +4,14 @@ module Concurrent::Channel::Buffer
describe Dropping do
specify { expect(subject).to_not be_blocking }
subject { described_class.new(10) }
it_behaves_like :channel_buffered_buffer
specify do
expect(subject).to_not be_blocking
end
context '#put' do
it 'does not block when full' do

View File

@ -4,11 +4,14 @@ module Concurrent::Channel::Buffer
describe Sliding do
specify { expect(subject).to_not be_blocking }
subject { described_class.new(10) }
it_behaves_like :channel_buffered_buffer
specify do
expect(subject).to_not be_blocking
end
context '#put' do
it 'does not block when full' do

View File

@ -0,0 +1,51 @@
require_relative 'timing_buffer_shared'
module Concurrent::Channel::Buffer
describe Ticker do
subject { described_class.new(1) }
it_behaves_like :channel_timing_buffer
context '#take' do
it 'triggers until closed' do
subject = described_class.new(0.1)
expected = 3
actual = 0
expected.times { actual += 1 if subject.take.is_a? Concurrent::Channel::Tick }
expect(actual).to eq expected
end
end
context '#poll' do
it 'triggers until closed' do
subject = described_class.new(0.1)
expected = 3
actual = 0
expected.times do
until subject.poll.is_a?(Concurrent::Channel::Tick)
actual += 1
end
end
end
end
context '#next' do
it 'returns more until closed' do
subject = described_class.new(0.1)
_, more = subject.next
expect(more).to be true
end
it 'triggers until closed' do
subject = described_class.new(0.1)
expected = 3
actual = 0
expected.times { actual += 1 if subject.next.first.is_a? Concurrent::Channel::Tick }
expect(actual).to eq expected
end
end
end
end

View File

@ -1,39 +1,51 @@
require_relative 'timing_buffer_shared'
module Concurrent::Channel::Buffer
describe Timer do
subject { described_class.new(0) }
specify { expect(subject).to be_blocking }
specify { expect(subject.size).to eq 1 }
context '#empty?' do
pending
end
context '#full?' do
pending
end
context '#put' do
pending
end
context '#offer' do
pending
end
it_behaves_like :channel_timing_buffer
context '#take' do
pending
end
context '#next' do
pending
it 'closes automatically on first take' do
subject = described_class.new(0.1)
expect(subject.take).to be_truthy
expect(subject).to be_closed
end
end
context '#poll' do
pending
it 'closes automatically on first take' do
subject = described_class.new(0.1)
loop do
break if subject.poll != NO_VALUE
end
expect(subject).to be_closed
end
end
context '#next' do
it 'closes automatically on first take' do
subject = described_class.new(0.1)
loop do
value, _ = subject.next
break if value != NO_VALUE
end
expect(subject).to be_closed
end
it 'returns false for more on first take' do
subject = described_class.new(0.1)
more = true
loop do
value, more = subject.next
break if value != NO_VALUE
end
expect(more).to be false
end
end
end
end

View File

@ -0,0 +1,169 @@
require_relative 'base_shared'
shared_examples :channel_timing_buffer do
specify do
expect(subject).to be_blocking
end
context '#capacity' do
specify do
expect(subject.capacity).to eq 1
end
end
context '#size' do
specify do
expect(subject.size).to eq 0
end
end
context '#empty?' do
specify do
expect(subject).to_not be_empty
end
end
context '#full?' do
specify do
expect(subject).to be_full
end
end
context '#put' do
specify do
expect(subject.put(:foo)).to be false
end
end
context '#offer' do
specify do
expect(subject.offer(:foo)).to be false
end
end
context '#take' do
it 'blocks when the timer is not ready' do
actual = Concurrent::AtomicBoolean.new(false)
subject = described_class.new(10)
t = Thread.new do
subject.take
actual.make_true
end
t.join(0.1)
actual = actual.value
t.kill # clean up
expect(actual).to be false
end
it 'returns a Tick' do
subject = described_class.new(0.1)
expect(subject.take).to be_a Concurrent::Channel::Tick
end
it 'returns NO_VALUE when closed' do
subject.close
expect(subject.take).to eq Concurrent::Channel::Buffer::NO_VALUE
end
it 'triggers after the specified time interval' do
start = Concurrent::Channel::Tick.new.monotonic
subject = described_class.new(0.1)
actual = subject.take.monotonic
expect(actual - start).to be >= 0.1
end
end
context '#poll' do
it 'returns NO_VALUE when the timer is not ready' do
subject = described_class.new(0.1)
expect(subject.poll).to eq Concurrent::Channel::Buffer::NO_VALUE
end
it 'returns a Tick' do
subject = described_class.new(0.1)
sleep(0.2)
expect(subject.poll).to be_a Concurrent::Channel::Tick
end
it 'returns NO_VALUE when closed' do
subject.close
expect(subject.poll).to eq Concurrent::Channel::Buffer::NO_VALUE
end
it 'triggers after the specified time interval' do
start = Concurrent::Channel::Tick.new.monotonic
subject = described_class.new(0.1)
sleep(0.2)
actual = subject.poll.monotonic
expect(actual - start).to be >= 0.1
end
end
context '#next' do
it 'blocks when the timer is not ready' do
actual = Concurrent::AtomicBoolean.new(false)
subject = described_class.new(10)
t = Thread.new do
subject.next
actual.make_true
end
t.join(0.1)
actual = actual.value
t.kill # clean up
expect(actual).to be false
end
it 'returns a Tick when open' do
subject = described_class.new(0.1)
value, _ = subject.next
expect(value).to be_a Concurrent::Channel::Tick
end
it 'returns NO_VALUE and false when closed' do
subject.close
value, more = subject.next
expect(value).to eq Concurrent::Channel::Buffer::NO_VALUE
expect(more).to be false
end
it 'triggers after the specified time interval' do
start = Concurrent::Channel::Tick.new.monotonic
subject = described_class.new(0.1)
actual, _ = subject.next
expect(actual.monotonic - start).to be >= 0.1
end
end
context '#close' do
it 'sets #closed? to false' do
subject.close
expect(subject).to be_closed
end
it 'returns true when not previously closed' do
expect(subject.close).to be true
end
it 'returns false when already closed' do
subject.close
expect(subject.close).to be false
end
end
context '#closed?' do
it 'returns true when new' do
expect(subject).to_not be_closed
end
it 'returns false after #close' do
subject.close
expect(subject).to be_closed
end
end
end

View File

@ -4,13 +4,80 @@ module Concurrent::Channel::Buffer
describe Unbuffered do
specify { expect(subject).to be_blocking }
subject { described_class.new }
it_behaves_like :channel_buffer
specify do
expect(subject).to be_blocking
end
specify do
expect(subject.capacity).to eq 1
end
context '#size' do
it 'is 0 when first created' do
expect(subject.size).to eq 0
end
it 'is 1 when a putting thread is waiting' do
t = Thread.new { subject.put(:foo) }
t.join(0.1)
expect(subject.size).to eq 1
t.kill # cleanup
end
it 'is 0 when there are taking threads but no putting threads' do
t = Thread.new { subject.take }
t.join(0.1)
expect(subject.size).to eq 0
t.kill # cleanup
end
end
context '#empty?' do
it 'is true when there are no putting threads' do
expect(subject).to be_empty
end
it 'is false when there are waiting putting threads' do
t = Thread.new { subject.put(:foo) }
t.join(0.1)
expect(subject).to_not be_empty
t.kill # cleanup
end
end
context '#full?' do
it 'is false when there are no putting threads' do
expect(subject).to_not be_full
end
it 'is false when there are waiting putting threads' do
t = Thread.new { subject.put(:foo) }
t.join(0.1)
expect(subject).to be_full
t.kill # cleanup
end
end
context '#put' do
it 'does not enqueue the item when closed' do
subject.close
subject.put(:foo)
expect(subject).to be_empty
end
it 'returns false when closed' do
subject.close
expect(subject.put(:foo)).to be false
end
it 'blocks until a thread is ready to take' do
subject # initialize on this thread
bucket = Concurrent::AtomicReference.new(nil)
@ -30,64 +97,18 @@ module Concurrent::Channel::Buffer
expect(after).to eq 42
expect(t.status).to be false
end
end
context '#take' do
it 'blocks until not empty the returns the first item' do
subject # initialize on this thread
bucket = Concurrent::AtomicReference.new(nil)
it 'delivers when closed after put starts' do
t = Thread.new do
bucket.value = subject.take
subject.put(:foo)
end
t.join(0.1)
subject.close
before = bucket.value
subject.put(42)
t.join(0.1)
after = bucket.value
item = subject.take
t.kill #clean up
expect(before).to be nil
expect(after).to eq 42
expect(t.status).to be false
end
end
context '#next' do
it 'blocks when no putting and returns <item>, true when one arrives' do
subject # initialize on this thread
bucket = Concurrent::AtomicReference.new([])
t = Thread.new do
bucket.value = subject.next
end
t.join(0.1)
before = bucket.value
subject.put(42)
t.join(0.1)
after = bucket.value
expect(before).to eq []
expect(after.first).to eq 42
expect(after.last).to be false
expect(t.status).to be false
end
it 'returns <item>, true when there are multiple putting' do
subject # initialize on this thread
threads = 2.times.collect do
Thread.new do
subject.put(42)
end
end
threads.each {|t| t.join(0.1)}
item, more = subject.next
subject.poll # clear the channel
expect(item).to eq 42
expect(more).to be true
expect(item).to eq :foo
end
end
@ -124,5 +145,105 @@ module Concurrent::Channel::Buffer
expect(after).to eq 42
end
end
context '#take' do
it 'returns false immediately when a put in in progress' do
subject # initialize on this thread
t = Thread.new do
subject.put(:foo) # block the thread
end
t.join(0.1)
ok = subject.offer(:bar)
subject.poll # release the blocked thread
expect(ok).to be false
end
it 'gives the item to a waiting taker and returns true' do
subject # initialize on this thread
bucket = Concurrent::AtomicReference.new(nil)
t = Thread.new do
bucket.value = subject.take
end
t.join(0.1)
before = bucket.value
ok = subject.offer(42)
t.join(0.1)
after = bucket.value
expect(ok).to be true
expect(before).to be nil
expect(after).to eq 42
end
end
context '#next' do
it 'blocks when no putting and returns <item>, true when one arrives' do
subject # initialize on this thread
bucket = Concurrent::AtomicReference.new([])
t = Thread.new do
bucket.value = subject.next
end
t.join(0.1)
before = bucket.value
subject.put(42)
t.join(0.1)
after = bucket.value
expect(before).to eq []
expect(after.first).to eq 42
expect(after.last).to be true
expect(t.status).to be false
end
it 'returns <item>, true when there are multiple putting' do
subject # initialize on this thread
threads = 2.times.collect do
Thread.new do
subject.put(42)
end
end
threads.each {|t| t.join(0.1)}
item, more = subject.next
subject.poll # clear the channel
expect(item).to eq 42
expect(more).to be true
end
it 'returns <item>, true when closed and last item' do
t = Thread.new do
subject.put(:foo)
end
t.join(0.1)
subject.close
item, more = subject.next
t.kill #clean up
expect(item).to eq :foo
expect(more).to be true
end
it 'returns NO_VALUE, false when closed and no items remain' do
t = Thread.new do
subject.put(:foo)
end
subject.close
subject.next
item, more = subject.next
t.kill #clean up
expect(item).to eq Concurrent::Channel::Buffer::NO_VALUE
expect(more).to be false
end
end
end
end

View File

@ -26,6 +26,10 @@ module Concurrent
expect(subject.utc.to_f).to eq subject.epoch
end
specify do
expect(subject.to_s).to match /\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6} \+\d{4} UTC/
end
context 'comparison' do
it 'correctly compares to a Numeric (monotonic)' do

View File

@ -10,7 +10,7 @@ module Concurrent
}.to raise_error(ArgumentError)
end
it 'is :unbuffered when neither :buffer nore :size is given' do
it 'is :unbuffered when neither :buffer nore :capacity is given' do
expect(Channel::Buffer::Unbuffered).to receive(:new).with(no_args).and_call_original
Channel.new
end
@ -20,184 +20,313 @@ module Concurrent
Channel.new(buffer: :unbuffered)
end
it 'is :unbuffered when :buffered and size: 0' do
it 'is :unbuffered when :buffered and capacity: 0' do
expect(Channel::Buffer::Unbuffered).to receive(:new).with(no_args).and_call_original
Channel.new(buffer: :buffered, size: 0)
Channel.new(buffer: :buffered, capacity: 0)
end
it 'raises an exception when both :unbuffered and :size are given' do
it 'raises an exception when both :unbuffered and :capacity are given' do
expect {
Channel.new(buffer: :unbuffered, size: 0)
Channel.new(buffer: :unbuffered, capacity: 0)
}.to raise_error(ArgumentError)
end
it 'is :buffered when :size > 0 and no :buffer given' do
it 'is :buffered when :capacity > 0 and no :buffer given' do
expect(Channel::Buffer::Buffered).to receive(:new).with(5).and_call_original
Channel.new(size: 5)
Channel.new(capacity: 5)
end
it 'is :buffered when :buffered given' do
expect(Channel::Buffer::Buffered).to receive(:new).with(5).and_call_original
Channel.new(buffer: :buffered, size: 5)
Channel.new(buffer: :buffered, capacity: 5)
end
it 'raises an exception when :buffered given without :size' do
it 'raises an exception when :buffered given without :capacity' do
expect {
Channel.new(buffer: :buffered)
}.to raise_error(ArgumentError)
end
it 'raises an exception when :buffered and :size < 0' do
it 'raises an exception when :buffered and :capacity < 0' do
expect {
Channel.new(buffer: :buffered, size: -1)
Channel.new(buffer: :buffered, capacity: -1)
}.to raise_error(ArgumentError)
end
it 'is :dropping when :dropping and :size > 0' do
it 'is :dropping when :dropping and :capacity > 0' do
expect(Channel::Buffer::Dropping).to receive(:new).with(5).and_call_original
Channel.new(buffer: :dropping, size: 5)
Channel.new(buffer: :dropping, capacity: 5)
end
it 'raises an exception when :dropping given without :size' do
it 'raises an exception when :dropping given without :capacity' do
expect {
Channel.new(buffer: :dropping)
}.to raise_error(ArgumentError)
end
it 'raises an exception when :dropping and :size < 1' do
it 'raises an exception when :dropping and :capacity < 1' do
expect {
Channel.new(buffer: :dropping, size: 0)
Channel.new(buffer: :dropping, capacity: 0)
}.to raise_error(ArgumentError)
end
it 'is :sliding when :sliding and :size > 0' do
it 'is :sliding when :sliding and :capacity > 0' do
expect(Channel::Buffer::Sliding).to receive(:new).with(5).and_call_original
Channel.new(buffer: :sliding, size: 5)
Channel.new(buffer: :sliding, capacity: 5)
end
it 'raises an exception when :sliding given without :size' do
it 'raises an exception when :sliding given without :capacity' do
expect {
Channel.new(buffer: :sliding)
}.to raise_error(ArgumentError)
end
it 'raises an exception when :sliding and :size < 1' do
it 'raises an exception when :sliding and :capacity < 1' do
expect {
Channel.new(buffer: :sliding, size: 0)
Channel.new(buffer: :sliding, capacity: 0)
}.to raise_error(ArgumentError)
end
it 'uses the given buffer' do
buffer = Channel::Buffer::Buffered.new(10)
subject = Channel.new(buffer)
expect(subject).to receive(:put).with(42)
subject.put(42)
end
end
context 'factories' do
specify do
expect(Channel::Buffer::Ticker).to receive(:new).with(10).and_call_original
Channel.ticker(10)
end
specify do
expect(Channel::Buffer::Timer).to receive(:new).with(10).and_call_original
Channel.timer(10)
end
end
context '#put' do
it 'enqueues the item when not full and not closed' do
subject = Channel.new(buffer: :buffered, size: 2)
subject.put(:foo)
internal_buffer = subject.instance_variable_get(:@buffer)
expect(internal_buffer).to_not be_empty
end
it 'returns true on success' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
expect(subject.put(:foo)).to be true
end
it 'returns false when closed' do
subject = Channel.new(buffer: :buffered, size: 2)
it 'returns false on failure' do
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
expect(subject.put(:foo)).to be false
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.put(42)).to be false
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { raise StandardError }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.put(42)).to be false
end
it 'rejects nil' do
expect(subject.put(nil)).to be false
end
end
context 'put!' do
it 'returns true on success' do
subject = Channel.new(buffer: :buffered, capacity: 2)
expect(subject.put!(:foo)).to be true
end
it 'raises an exception on failure' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
expect {
subject.put!(:foo)
}.to raise_error(Channel::Error)
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect{
subject.put!(42)
}.to raise_error(Channel::ValidationError)
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { raise StandardError }
subject = Channel.new(capacity: 10, validator: validator)
expect{
subject.put!(42)
}.to raise_error(StandardError)
end
it 'rejects nil' do
expect {
subject.put!(nil)
}.to raise_error(Channel::ValidationError)
end
end
context 'put?' do
it 'returns a just Maybe on success' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
result = subject.put?(:foo)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_just
end
it 'returns a nothing Maybe on failure' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
result = subject.put?(:foo)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_nothing
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.put?(42)).to be_nothing
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.put?(42)).to be_nothing
end
it 'accepts nil' do
result = subject.put?(nil)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_just
end
end
context '#offer' do
it 'enqueues the item when not full and not closed' do
subject = Channel.new(buffer: :buffered, size: 2)
subject.offer(:foo)
internal_buffer = subject.instance_variable_get(:@buffer)
expect(internal_buffer).to_not be_empty
end
it 'returns true on success' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
expect(subject.offer(:foo)).to be true
end
it 'returns false when closed' do
subject = Channel.new(buffer: :buffered, size: 2)
it 'returns false on failure' do
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
expect(subject.offer(:foo)).to be false
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.offer(42)).to be false
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { raise StandardError }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.offer(42)).to be false
end
it 'rejects nil' do
expect(subject.offer(nil)).to be false
end
end
context 'offer!' do
it 'returns true on success' do
subject = Channel.new(buffer: :buffered, capacity: 2)
expect(subject.offer!(:foo)).to be true
end
it 'raises an exception on failure' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
expect {
subject.offer!(:foo)
}.to raise_error(Channel::Error)
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect{
subject.offer!(42)
}.to raise_error(Channel::ValidationError)
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { raise StandardError }
subject = Channel.new(capacity: 10, validator: validator)
expect{
subject.offer!(42)
}.to raise_error(StandardError)
end
it 'rejects nil' do
expect {
subject.offer!(nil)
}.to raise_error(Channel::ValidationError)
end
end
context 'offer?' do
it 'returns a just Maybe on success' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
result = subject.offer?(:foo)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_just
end
it 'returns a nothing Maybe on failure' do
subject = Channel.new(buffer: :buffered, size: 2)
subject = Channel.new(buffer: :buffered, capacity: 2)
subject.close
result = subject.offer?(:foo)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_nothing
end
it 'rejects when the validator returns false' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.offer?(42)).to be_nothing
end
it 'rejects when the validator raises an exception' do
validator = ->(value) { false }
subject = Channel.new(capacity: 10, validator: validator)
expect(subject.offer?(42)).to be_nothing
end
it 'accepts nil' do
subject = Channel.new(buffer: :buffered, capacity: 2)
result = subject.offer?(nil)
expect(result).to be_a Concurrent::Maybe
expect(result).to be_just
end
end
context '#take' do
subject { Channel.new(buffer: :buffered, size: 2) }
subject { Channel.new(buffer: :buffered, capacity: 2) }
it 'takes the next item when not empty' do
subject.put(:foo)
expect(subject.take).to eq :foo
end
it 'returns nil when empty and closed' do
it 'returns nil on failure' do
subject.close
expect(subject.take).to be nil
end
@ -205,7 +334,12 @@ module Concurrent
context '#take!' do
subject { Channel.new(buffer: :buffered, size: 2) }
subject { Channel.new(buffer: :buffered, capacity: 2) }
it 'takes the next item when not empty' do
subject.put(:foo)
expect(subject.take!).to eq :foo
end
it 'raises an exception on failure' do
subject.close
@ -217,7 +351,7 @@ module Concurrent
context '#take?' do
subject { Channel.new(buffer: :buffered, size: 2) }
subject { Channel.new(buffer: :buffered, capacity: 2) }
it 'returns a just Maybe on success' do
subject.put(:foo)
@ -237,7 +371,7 @@ module Concurrent
context '#next' do
subject { Channel.new(buffer: :buffered, size: 3) }
subject { Channel.new(buffer: :buffered, capacity: 3) }
it 'returns <item>, true when there is one item' do
subject.put(:foo)
@ -263,25 +397,38 @@ module Concurrent
expect(more).to be false
end
it 'returns <item>, false when closed and last item' do
subject.offer(:foo)
subject.offer(:bar)
subject.offer(:baz)
it 'returns <item>, true when closed and last item' do
capacity = subject.capacity
expect(capacity).to be >= 1
capacity.times { subject.put(:foo) }
subject.close
_, more1 = subject.next
_, more2 = subject.next
_, more3 = subject.next
capacity.times do
item, more = subject.next
expect(item).to eq :foo
expect(more).to be true
end
end
expect(more1).to be true
expect(more2).to be true
expect(more3).to be false
it 'returns nil, false when closed and no items remain' do
capacity = subject.capacity
expect(capacity).to be >= 1
capacity.times { subject.put(:foo) }
subject.close
capacity.times { subject.next }
item, more = subject.next
expect(item).to be_nil
expect(more).to be false
end
end
context '#next?' do
subject { Channel.new(buffer: :buffered, size: 2) }
subject { Channel.new(buffer: :buffered, capacity: 2) }
it 'returns a just Maybe and true when there is one item' do
subject.put(:foo)
@ -329,7 +476,7 @@ module Concurrent
expect(subject.poll).to be nil
end
it 'returns nil when closed' do
it 'returns nil on failure' do
subject.close
expect(subject.poll).to be nil
end
@ -337,13 +484,23 @@ module Concurrent
context '#poll!' do
it 'returns the next item immediately if available' do
subject # initialize on this thread
t = Thread.new do
subject.put(42)
end
t.join(0.1)
expect(subject.poll!).to eq 42
end
it 'raises an exception immediately if no item is available' do
expect {
subject.poll!
}.to raise_error(Channel::Error)
end
it 'raises an exception when closed' do
it 'raises an exception on failure' do
subject.close
expect {
subject.poll!
@ -372,7 +529,7 @@ module Concurrent
expect(result).to be_nothing
end
it 'returns a nothing Maybe when closed' do
it 'returns a nothing Maybe on failure' do
subject.close
result = subject.poll?
expect(result).to be_a Concurrent::Maybe
@ -381,15 +538,124 @@ module Concurrent
end
context '.each' do
pending
it 'raises and exception when no block is given' do
expect {
subject.each
}.to raise_error(ArgumentError)
end
it 'iterates until the channel is closed' do
expected = [13, 42, 2001]
subject = Channel.new(capacity: expected.length)
expected.each { |value| subject.put(value) }
subject.close
actual = []
subject.each { |value| actual << value }
expect(actual).to eq expected
end
end
context '.go' do
pending
context 'goroutines' do
let(:default_executor) { Channel.const_get(:GOROUTINES) }
context '.go' do
it 'raises an exception when no block is given' do
expect {
Channel.go
}.to raise_error(ArgumentError)
end
specify do
expect(default_executor).to receive(:post).with(1, 2, 3)
Channel.go(1, 2, 3) { nil }
end
end
context '.go_via' do
it 'raises an exception when no block is given' do
expect {
Channel.go_via
}.to raise_error(ArgumentError)
end
specify do
executor = ImmediateExecutor.new
expect(executor).to receive(:post).with(1, 2, 3)
Channel.go_via(executor, 1, 2, 3) { nil }
end
end
context '.go_loop' do
it 'raises an exception when no block is given' do
expect {
Channel.go_loop
}.to raise_error(ArgumentError)
end
it 'loops until the block returns false' do
actual = 0
expected = 3
latch = Concurrent::CountDownLatch.new(expected)
Channel.go_loop do
actual += 1
latch.count_down
actual < expected
end
latch.wait(3)
expect(actual).to eq expected
end
end
context '.go_loop_via' do
it 'raises an exception when no block is given' do
expect {
Channel.go_loop_via
}.to raise_error(ArgumentError)
end
it 'loops until the block returns false' do
actual = 0
expected = 3
executor = ImmediateExecutor.new
latch = Concurrent::CountDownLatch.new(expected)
Channel.go_loop_via(executor) do
actual += 1
latch.count_down
actual < expected
end
latch.wait(3)
expect(actual).to eq expected
end
end
end
context '.timer' do
pending
context 'select' do
it 'raises an exception when no block is given' do
expect {
Channel.select
}.to raise_error(ArgumentError)
end
it 'passes a selector to the block' do
actual = nil
Channel.select { |s| actual = s; s.error { } }
expect(actual).to be_a Channel::Selector
end
specify do
expect_any_instance_of(Channel::Selector).to receive(:execute)
Channel.select { |s| s.error { } }
end
end
end
end

View File

@ -19,7 +19,7 @@ describe Concurrent::Edge::LockFreeLinkedSet do
expect(subject.add 'test string1').to be true
end
context 'in a multi-threaded environment' do
context 'in a multi-threaded environment', buggy: true do
it 'adds the items to the set' do
to_insert = %w(one two three four five six)

View File

@ -1,10 +1,7 @@
$VERBOSE = nil # suppress our deprecation warnings
require 'concurrent'
require 'concurrent-edge'
Concurrent.use_stdlib_logger Logger::FATAL
unless Concurrent.on_jruby_9000?
# wwe can't use our helpers here because we need to load the gem _after_ simplecov
unless RUBY_ENGINE == 'jruby' && 0 == (JRUBY_VERSION =~ /^9\.0\.0\.0/)
if ENV['COVERAGE'] || ENV['CI'] || ENV['TRAVIS']
require 'simplecov'
require 'coveralls'
@ -21,18 +18,17 @@ unless Concurrent.on_jruby_9000?
SimpleCov.start do
project_name 'concurrent-ruby'
add_filter '/build-tests/'
add_filter '/coverage/'
add_filter '/doc/'
add_filter '/examples/'
add_filter '/pkg/'
add_filter '/spec/'
add_filter '/tasks/'
add_filter '/yard-template/'
add_filter '/yardoc/'
end
end
end
require 'concurrent'
require 'concurrent-edge'
Concurrent.use_stdlib_logger Logger::FATAL
# import all the support files
Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) }