Add popping and selection with a matcher and peek to channel
This commit is contained in:
parent
16bc2beff1
commit
bf74c67ba5
|
@ -16,8 +16,6 @@ module Concurrent
|
|||
# TODO (pitr-ch 14-Jan-2019): better documentation, do few examples from go
|
||||
# TODO (pitr-ch 12-Dec-2018): implement channel closing,
|
||||
# - as a child class? To also have a channel which cannot be closed.
|
||||
# TODO (pitr-ch 18-Dec-2018): It needs unpop to return non matched messages read by actor or rather atomic operations pop_first_matching
|
||||
# TODO (pitr-ch 15-Jan-2019): needs peek
|
||||
# TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation, at least getting a message when available should be lock free same goes for push with space available
|
||||
|
||||
# @!macro channel.warn.blocks
|
||||
|
@ -43,12 +41,23 @@ module Concurrent
|
|||
end
|
||||
end
|
||||
|
||||
NOTHING = Object.new
|
||||
private_constant :NOTHING
|
||||
|
||||
# An object which matches anything (with #===)
|
||||
ANY = Object.new.tap do |any|
|
||||
def any.===(other)
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
# Create channel.
|
||||
# @param [Integer, UNLIMITED_CAPACITY] capacity the maximum number of messages which can be stored in the channel.
|
||||
def initialize(capacity = UNLIMITED_CAPACITY)
|
||||
super()
|
||||
@Capacity = capacity
|
||||
@Mutex = Mutex.new
|
||||
@Capacity = capacity
|
||||
@Mutex = Mutex.new
|
||||
# TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle
|
||||
@Probes = []
|
||||
@Messages = []
|
||||
@PendingPush = []
|
||||
|
@ -114,90 +123,158 @@ module Concurrent
|
|||
result == pushed_op ? self : result
|
||||
end
|
||||
|
||||
# Pop a message from the channel if there is one available.
|
||||
# @param [Object] no_value returned when there is no message available
|
||||
# @return [Object, no_value] message or nil when there is no message
|
||||
# @!macro promises.channel.try_pop
|
||||
# Pop a message from the channel if there is one available.
|
||||
# @param [Object] no_value returned when there is no message available
|
||||
# @return [Object, no_value] message or nil when there is no message
|
||||
def try_pop(no_value = nil)
|
||||
message = try_pop_disambiguated
|
||||
message == NOTHING ? no_value : message
|
||||
try_pop_matching ANY, no_value
|
||||
end
|
||||
|
||||
# Returns a future witch will become fulfilled with a value from the channel when one is available.
|
||||
# @!macro chanel.operation_wait
|
||||
# @!macro promises.channel.try_pop
|
||||
# @!macro promises.channel.param.matcher
|
||||
# @param [#===] matcher only consider message which matches `matcher === a_message`
|
||||
def try_pop_matching(matcher, no_value = nil)
|
||||
@Mutex.synchronize do
|
||||
message = ns_shift_message matcher
|
||||
return message if message != NOTHING
|
||||
message = ns_consume_pending_push matcher
|
||||
return message != NOTHING ? message : no_value
|
||||
end
|
||||
end
|
||||
|
||||
# @!macro promises.channel.pop_op
|
||||
# Returns a future witch will become fulfilled with a value from the channel when one is available.
|
||||
# @!macro chanel.operation_wait
|
||||
#
|
||||
# @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
|
||||
# @return [Future(Object)] the probe, its value will be the message when available.
|
||||
# @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
|
||||
# @return [Future(Object)] the probe, its value will be the message when available.
|
||||
def pop_op(probe = Promises.resolvable_future)
|
||||
@Mutex.synchronize { ns_pop_op(probe, false) }
|
||||
@Mutex.synchronize { ns_pop_op(ANY, probe, false) }
|
||||
end
|
||||
|
||||
# Blocks current thread until a message is available in the channel for popping.
|
||||
# @!macro promises.channel.pop_op
|
||||
# @!macro promises.channel.param.matcher
|
||||
def pop_op_matching(matcher, probe = Promises.resolvable_future)
|
||||
@Mutex.synchronize { ns_pop_op(matcher, probe, false) }
|
||||
end
|
||||
|
||||
# @!macro promises.channel.pop
|
||||
# Blocks current thread until a message is available in the channel for popping.
|
||||
#
|
||||
# @!macro channel.warn.blocks
|
||||
# @!macro channel.param.timeout
|
||||
# @!macro promises.param.timeout_value
|
||||
# @return [Object, nil] message or nil when timed out
|
||||
# @!macro channel.warn.blocks
|
||||
# @!macro channel.param.timeout
|
||||
# @!macro promises.param.timeout_value
|
||||
# @return [Object, nil] message or nil when timed out
|
||||
def pop(timeout = nil, timeout_value = nil)
|
||||
pop_matching ANY, timeout, timeout_value
|
||||
end
|
||||
|
||||
# @!macro promises.channel.pop
|
||||
# @!macro promises.channel.param.matcher
|
||||
def pop_matching(matcher, timeout = nil, timeout_value = nil)
|
||||
# TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer
|
||||
# that it returns even if the buffer is full. User might expect that it has to be in the buffer first.
|
||||
probe = @Mutex.synchronize do
|
||||
message = ns_shift_message
|
||||
message = ns_shift_message matcher
|
||||
if message == NOTHING
|
||||
message = ns_consume_pending_push
|
||||
message = ns_consume_pending_push matcher
|
||||
return message if message != NOTHING
|
||||
else
|
||||
new_message = ns_consume_pending_push
|
||||
new_message = ns_consume_pending_push ANY
|
||||
@Messages.push new_message unless new_message == NOTHING
|
||||
return message
|
||||
end
|
||||
|
||||
probe = Promises.resolvable_future
|
||||
@Probes.push false, probe
|
||||
@Probes.push probe, false, matcher
|
||||
probe
|
||||
end
|
||||
|
||||
probe.value!(timeout, timeout_value, [true, timeout_value, nil])
|
||||
end
|
||||
|
||||
# If message is available in the receiver or any of the provided channels
|
||||
# the channel message pair is returned. If there is no message nil is returned.
|
||||
# The returned channel is the origin of the message.
|
||||
# @!macro promises.channel.peek
|
||||
# Behaves as {#try_pop} but it does not remove the message from the channel
|
||||
# @param [Object] no_value returned when there is no message available
|
||||
# @return [Object, no_value] message or nil when there is no message
|
||||
def peek(no_value = nil)
|
||||
peek_matching ANY, no_value
|
||||
end
|
||||
|
||||
# @!macro promises.channel.peek
|
||||
# @!macro promises.channel.param.matcher
|
||||
def peek_matching(matcher, no_value = nil)
|
||||
@Mutex.synchronize do
|
||||
message = ns_shift_message matcher, false
|
||||
return message if message != NOTHING
|
||||
message = ns_consume_pending_push matcher, false
|
||||
return message != NOTHING ? message : no_value
|
||||
end
|
||||
end
|
||||
|
||||
# @!macro promises.channel.try_select
|
||||
# If message is available in the receiver or any of the provided channels
|
||||
# the channel message pair is returned. If there is no message nil is returned.
|
||||
# The returned channel is the origin of the message.
|
||||
#
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @return [::Array(Channel, Object), nil]
|
||||
# pair [channel, message] if one of the channels is available for reading
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @return [::Array(Channel, Object), nil]
|
||||
# pair [channel, message] if one of the channels is available for reading
|
||||
def try_select(channels)
|
||||
try_select_matching ANY, channels
|
||||
end
|
||||
|
||||
# @!macro promises.channel.try_select
|
||||
# @!macro promises.channel.param.matcher
|
||||
def try_select_matching(matcher, channels)
|
||||
message = nil
|
||||
channel = [self, *channels].find do |ch|
|
||||
message = ch.try_pop_disambiguated
|
||||
message = ch.try_pop_matching(matcher, NOTHING)
|
||||
message != NOTHING
|
||||
end
|
||||
channel ? [channel, message] : nil
|
||||
end
|
||||
|
||||
# When message is available in the receiver or any of the provided channels
|
||||
# the future is fulfilled with a channel message pair.
|
||||
# The returned channel is the origin of the message.
|
||||
# @!macro chanel.operation_wait
|
||||
# @!macro promises.channel.select_op
|
||||
# When message is available in the receiver or any of the provided channels
|
||||
# the future is fulfilled with a channel message pair.
|
||||
# The returned channel is the origin of the message.
|
||||
# @!macro chanel.operation_wait
|
||||
#
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @param [ResolvableFuture] probe the future which will be fulfilled with the message
|
||||
# @return [ResolvableFuture(::Array(Channel, Object))] a future which is fulfilled with
|
||||
# pair [channel, message] when one of the channels is available for reading
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @param [ResolvableFuture] probe the future which will be fulfilled with the message
|
||||
# @return [ResolvableFuture(::Array(Channel, Object))] a future which is fulfilled with
|
||||
# pair [channel, message] when one of the channels is available for reading
|
||||
def select_op(channels, probe = Promises.resolvable_future)
|
||||
[self, *channels].each { |ch| ch.partial_select_op probe }
|
||||
select_op_matching ANY, channels, probe
|
||||
end
|
||||
|
||||
# @!macro promises.channel.select_op
|
||||
# @!macro promises.channel.param.matcher
|
||||
def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
|
||||
[self, *channels].each { |ch| ch.partial_select_op matcher, probe }
|
||||
probe
|
||||
end
|
||||
|
||||
# As {#select_op} but does not return future,
|
||||
# it block current thread instead until there is a message available
|
||||
# in the receiver or in any of the channels.
|
||||
# @!macro promises.channel.select
|
||||
# As {#select_op} but does not return future,
|
||||
# it block current thread instead until there is a message available
|
||||
# in the receiver or in any of the channels.
|
||||
#
|
||||
# @!macro channel.warn.blocks
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @!macro channel.param.timeout
|
||||
# @return [::Array(Channel, Object), nil] message or nil when timed out
|
||||
# @see #select_op
|
||||
# @!macro channel.warn.blocks
|
||||
# @param [Channel, ::Array<Channel>] channels
|
||||
# @!macro channel.param.timeout
|
||||
# @return [::Array(Channel, Object), nil] message or nil when timed out
|
||||
# @see #select_op
|
||||
def select(channels, timeout = nil)
|
||||
probe = select_op(channels)
|
||||
select_matching ANY, channels, timeout
|
||||
end
|
||||
|
||||
# @!macro promises.channel.select
|
||||
# @!macro promises.channel.param.matcher
|
||||
def select_matching(matcher, channels, timeout = nil)
|
||||
probe = select_op_matching(matcher, channels)
|
||||
probe.value!(timeout, nil, [true, nil, nil])
|
||||
end
|
||||
|
||||
|
@ -233,37 +310,44 @@ module Concurrent
|
|||
end
|
||||
|
||||
# @see #select
|
||||
# @return [Object, nil]
|
||||
# @return [::Array(Channel, Object), nil]
|
||||
def select(channels, timeout = nil)
|
||||
channels.first.select(channels[1..-1], timeout)
|
||||
end
|
||||
|
||||
# @see #try_select_matching
|
||||
# @return [::Array(Channel, Object)]
|
||||
def try_select_matching(matcher, channels)
|
||||
channels.first.try_select_matching(matcher, channels[1..-1])
|
||||
end
|
||||
|
||||
# @see #select_op_matching
|
||||
# @return [Future(::Array(Channel, Object))]
|
||||
def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
|
||||
channels.first.select_op_matching(matcher, channels[1..-1], probe)
|
||||
end
|
||||
|
||||
# @see #select_matching
|
||||
# @return [::Array(Channel, Object), nil]
|
||||
def select_matching(matcher, channels, timeout = nil)
|
||||
channels.first.select_matching(matcher, channels[1..-1], timeout)
|
||||
end
|
||||
end
|
||||
|
||||
# @!visibility private
|
||||
def partial_select_op(probe)
|
||||
@Mutex.synchronize { ns_pop_op(probe, true) }
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def try_pop_disambiguated
|
||||
@Mutex.synchronize do
|
||||
message = ns_shift_message
|
||||
return message if message != NOTHING
|
||||
return ns_consume_pending_push
|
||||
end
|
||||
|
||||
def partial_select_op(matcher, probe)
|
||||
@Mutex.synchronize { ns_pop_op(matcher, probe, true) }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def ns_pop_op(probe, include_channel)
|
||||
message = ns_shift_message
|
||||
def ns_pop_op(matcher, probe, include_channel)
|
||||
message = ns_shift_message matcher
|
||||
|
||||
# got message from buffer
|
||||
if message != NOTHING
|
||||
if probe.fulfill(include_channel ? [self, message] : message, false)
|
||||
new_message = ns_consume_pending_push
|
||||
new_message = ns_consume_pending_push ANY
|
||||
@Messages.push new_message unless new_message == NOTHING
|
||||
else
|
||||
@Messages.unshift message
|
||||
|
@ -272,69 +356,66 @@ module Concurrent
|
|||
end
|
||||
|
||||
# no message in buffer, try to pair with a pending push
|
||||
i = 0
|
||||
while true
|
||||
message, pushed = @PendingPush.first 2
|
||||
message, pushed = @PendingPush[i, 2]
|
||||
break if pushed.nil?
|
||||
|
||||
value = include_channel ? [self, message] : message
|
||||
if Promises::Resolvable.atomic_resolution(probe => [true, value, nil], pushed => [true, self, nil])
|
||||
@PendingPush.shift 2
|
||||
return probe
|
||||
if matcher === message
|
||||
value = include_channel ? [self, message] : message
|
||||
if Promises::Resolvable.atomic_resolution(probe => [true, value, nil],
|
||||
pushed => [true, self, nil])
|
||||
@PendingPush[i, 2] = []
|
||||
return probe
|
||||
end
|
||||
|
||||
if probe.resolved?
|
||||
return probe
|
||||
end
|
||||
|
||||
# so pushed.resolved? has to be true, remove the push
|
||||
@PendingPush[i, 2] = []
|
||||
end
|
||||
|
||||
if pushed.resolved?
|
||||
@PendingPush.shift 2
|
||||
next
|
||||
end
|
||||
|
||||
if probe.resolved?
|
||||
return probe
|
||||
end
|
||||
|
||||
raise 'should not reach'
|
||||
i += 2
|
||||
end
|
||||
|
||||
# no push to pair with
|
||||
# TODO (pitr-ch 11-Jan-2019): clear up probes when timed out, use callback
|
||||
@Probes.push include_channel, probe if probe.pending?
|
||||
@Probes.push probe, include_channel, matcher if probe.pending?
|
||||
return probe
|
||||
end
|
||||
|
||||
def ns_consume_pending_push
|
||||
return NOTHING if @PendingPush.empty?
|
||||
def ns_consume_pending_push(matcher, remove = true)
|
||||
i = 0
|
||||
while true
|
||||
message, pushed = @PendingPush.shift 2
|
||||
message, pushed = @PendingPush[i, 2]
|
||||
return NOTHING unless pushed
|
||||
# can fail if timed-out, so try without error
|
||||
if pushed.fulfill(self, false)
|
||||
# pushed fulfilled so actually push the message
|
||||
return message
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def ns_peek_pending_push
|
||||
return NOTHING if @PendingPush.empty?
|
||||
while true
|
||||
message, pushed = @PendingPush.first 2
|
||||
return NOTHING unless pushed
|
||||
# can be timed-out
|
||||
if pushed.resolved?
|
||||
@PendingPush.shift 2
|
||||
# and repeat
|
||||
else
|
||||
return message
|
||||
if matcher === message
|
||||
resolved = pushed.resolved?
|
||||
@PendingPush[i, 2] = [] if remove || resolved
|
||||
# can fail if timed-out, so try without error
|
||||
if remove ? pushed.fulfill(self, false) : !resolved
|
||||
# pushed fulfilled so actually push the message
|
||||
return message
|
||||
end
|
||||
end
|
||||
|
||||
i += 2
|
||||
end
|
||||
end
|
||||
|
||||
def ns_try_push(message)
|
||||
i = 0
|
||||
while true
|
||||
include_channel, probe = @Probes.shift(2)
|
||||
probe, include_channel, matcher = @Probes[i, 3]
|
||||
break unless probe
|
||||
if probe.fulfill(include_channel ? [self, message] : message, false)
|
||||
if matcher === message && probe.fulfill(include_channel ? [self, message] : message, false)
|
||||
@Probes[i, 3] = []
|
||||
return true
|
||||
end
|
||||
i += 3
|
||||
end
|
||||
|
||||
if @Capacity > @Messages.size
|
||||
|
@ -345,18 +426,18 @@ module Concurrent
|
|||
end
|
||||
end
|
||||
|
||||
NOTHING = Object.new
|
||||
private_constant :NOTHING
|
||||
def ns_shift_message(matcher, remove = true)
|
||||
i = 0
|
||||
while true
|
||||
message = @Messages.fetch(i, NOTHING)
|
||||
return NOTHING if message == NOTHING
|
||||
|
||||
def ns_shift_message
|
||||
@Messages.empty? ? NOTHING : @Messages.shift
|
||||
end
|
||||
if matcher === message
|
||||
@Messages.delete_at i if remove
|
||||
return message
|
||||
end
|
||||
|
||||
def ns_try_peek
|
||||
if @Messages.empty?
|
||||
ns_peek_pending_push
|
||||
else
|
||||
@Messages.shift
|
||||
i += 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -118,6 +118,65 @@ RSpec.describe 'Concurrent' do
|
|||
expect(pop_op.value!).to eq :v2
|
||||
end
|
||||
|
||||
specify "#(try_)pop(_op)_matching" do
|
||||
channel = Concurrent::Promises::Channel.new 2
|
||||
channel.push 'junk'
|
||||
channel.push :v1
|
||||
|
||||
expect(channel.size).to eq 2
|
||||
expect(channel.try_pop_matching(Symbol)).to eq :v1
|
||||
expect(channel.size).to eq 1
|
||||
expect(channel.try_pop_matching(Symbol)).to eq nil
|
||||
expect(channel.size).to eq 1
|
||||
|
||||
channel = Concurrent::Promises::Channel.new 2
|
||||
channel.push 'junk'
|
||||
channel.push :v1
|
||||
expect(channel.pop_matching(Symbol)).to eq :v1
|
||||
expect(channel.size).to eq 1
|
||||
thread = in_thread { channel.pop_matching(Symbol) }
|
||||
is_sleeping thread
|
||||
expect(channel.size).to eq 1
|
||||
channel.push 'junk'
|
||||
channel.pop
|
||||
channel.push :v2
|
||||
expect(thread.value).to eq :v2
|
||||
expect(channel.size).to eq 1
|
||||
|
||||
channel = Concurrent::Promises::Channel.new 2
|
||||
channel.push 'junk'
|
||||
channel.push :v1
|
||||
expect(channel.pop_matching(Symbol)).to eq :v1
|
||||
expect(channel.size).to eq 1
|
||||
thread = in_thread { channel.pop_matching(Symbol, 0.01) }
|
||||
is_sleeping thread
|
||||
expect(channel.size).to eq 1
|
||||
expect(thread.value).to eq nil
|
||||
channel.push :v2
|
||||
expect(channel.size).to eq 2
|
||||
expect(channel.pop_matching(Symbol)).to eq :v2
|
||||
expect(channel.size).to eq 1
|
||||
thread = in_thread { channel.pop_matching(Symbol,1) }
|
||||
is_sleeping thread
|
||||
channel.push :v3
|
||||
expect(channel.size).to eq 1
|
||||
expect(thread.value).to eq :v3
|
||||
channel.push :v4
|
||||
expect(channel.pop_matching(Symbol,0)).to eq :v4
|
||||
|
||||
channel = Concurrent::Promises::Channel.new 2
|
||||
channel.push 'junk'
|
||||
channel.push :v1
|
||||
expect(channel.pop_op_matching(Symbol).value!).to eq :v1
|
||||
expect(channel.size).to eq 1
|
||||
pop_op = channel.pop_op_matching(Symbol)
|
||||
expect(channel.size).to eq 1
|
||||
expect(pop_op.pending?).to be_truthy
|
||||
channel.push :v2
|
||||
expect(channel.size).to eq 1
|
||||
expect(pop_op.value!).to eq :v2
|
||||
end
|
||||
|
||||
specify "#(try_)select(_op)" do
|
||||
channel1 = Concurrent::Promises::Channel.new 1
|
||||
channel2 = Concurrent::Promises::Channel.new 1
|
||||
|
|
Loading…
Reference in New Issue