mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
use TimedQueue from connection_pool
This commit is contained in:
parent
bf9684d6a5
commit
2d42f3df42
2 changed files with 1 additions and 43 deletions
|
@ -1,7 +1,7 @@
|
||||||
require 'helper'
|
require 'helper'
|
||||||
require 'sidekiq'
|
require 'sidekiq'
|
||||||
require 'sidekiq/manager'
|
require 'sidekiq/manager'
|
||||||
require 'timed_queue'
|
require 'connection_pool'
|
||||||
|
|
||||||
class TestManager < MiniTest::Unit::TestCase
|
class TestManager < MiniTest::Unit::TestCase
|
||||||
describe 'with redis' do
|
describe 'with redis' do
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
require 'thread'
|
|
||||||
require 'timeout'
|
|
||||||
|
|
||||||
class TimedQueue
|
|
||||||
def initialize
|
|
||||||
@que = []
|
|
||||||
@mutex = Mutex.new
|
|
||||||
@resource = ConditionVariable.new
|
|
||||||
end
|
|
||||||
|
|
||||||
def push(obj)
|
|
||||||
@mutex.synchronize do
|
|
||||||
@que.push obj
|
|
||||||
@resource.broadcast
|
|
||||||
end
|
|
||||||
end
|
|
||||||
alias_method :<<, :push
|
|
||||||
|
|
||||||
def timed_pop(timeout=0.5)
|
|
||||||
deadline = Time.now + timeout
|
|
||||||
@mutex.synchronize do
|
|
||||||
loop do
|
|
||||||
return @que.shift unless @que.empty?
|
|
||||||
to_wait = deadline - Time.now
|
|
||||||
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
|
|
||||||
@resource.wait(@mutex, to_wait)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def empty?
|
|
||||||
@que.empty?
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear
|
|
||||||
@que.clear
|
|
||||||
end
|
|
||||||
|
|
||||||
def length
|
|
||||||
@que.length
|
|
||||||
end
|
|
||||||
end
|
|
Loading…
Add table
Reference in a new issue