diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index ed00bba5..e3537b1b 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -19,10 +19,11 @@ module Sidekiq LUA def initialize + @done = false @lua_zpopbyscore_sha = nil end - def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) + def enqueue_jobs(sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| @@ -31,7 +32,7 @@ module Sidekiq # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. - while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now])) + while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s])) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end @@ -39,6 +40,10 @@ module Sidekiq end end + def terminate + @done = true + end + private def zpopbyscore(conn, keys: nil, argv: nil) @@ -74,6 +79,8 @@ module Sidekiq # Shut down this instance, will pause until the thread is dead. def terminate @done = true + @enq.terminate if @enq.respond_to?(:terminate) + if @thread t = @thread @thread = nil diff --git a/test/test_scheduled.rb b/test/test_scheduled.rb index 0a14903b..4fab6c5b 100644 --- a/test/test_scheduled.rb +++ b/test/test_scheduled.rb @@ -79,6 +79,30 @@ describe Sidekiq::Scheduled do end end + it 'should not enqueue jobs when terminate has been called' do + created_time = Time.new(2013, 2, 3) + enqueued_time = Time.new(2013, 2, 4) + + Time.stub(:now, created_time) do + @retry.schedule (enqueued_time - 60).to_f, @error_1.merge!('created_at' => created_time.to_f) + @scheduled.schedule (enqueued_time - 60).to_f, @future_1.merge!('created_at' => created_time.to_f) + end + + Time.stub(:now, enqueued_time) do + @poller.terminate + @poller.enqueue + + Sidekiq.redis do |conn| + %w(queue:queue_1 queue:queue_4).each do |queue_name| + assert_equal 0, conn.llen(queue_name) + end + end + + assert_equal 1, @retry.size + assert_equal 1, @scheduled.size + end + end + def with_sidekiq_option(name, value) _original, Sidekiq.options[name] = Sidekiq.options[name], value begin