mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
43 lines
711 B
Ruby
43 lines
711 B
Ruby
|
require 'thread'
|
||
|
require 'timeout'
|
||
|
|
||
|
class TimedQueue
|
||
|
def initialize
|
||
|
@que = []
|
||
|
@mutex = Mutex.new
|
||
|
@resource = ConditionVariable.new
|
||
|
end
|
||
|
|
||
|
def push(obj)
|
||
|
@mutex.synchronize do
|
||
|
@que.push obj
|
||
|
@resource.broadcast
|
||
|
end
|
||
|
end
|
||
|
alias_method :<<, :push
|
||
|
|
||
|
def timed_pop(timeout=0.5)
|
||
|
deadline = Time.now + timeout
|
||
|
@mutex.synchronize do
|
||
|
loop do
|
||
|
return @que.shift unless @que.empty?
|
||
|
to_wait = deadline - Time.now
|
||
|
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
|
||
|
@resource.wait(@mutex, to_wait)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def empty?
|
||
|
@que.empty?
|
||
|
end
|
||
|
|
||
|
def clear
|
||
|
@que.clear
|
||
|
end
|
||
|
|
||
|
def length
|
||
|
@que.length
|
||
|
end
|
||
|
end
|