diff --git a/Changes.md b/Changes.md index 69152740..66d174fe 100644 --- a/Changes.md +++ b/Changes.md @@ -1,6 +1,19 @@ HEAD ----------- +- Add Sidekiq::Workers API giving programmatic access to the current + set of active workers. + +``` +workers = Sidekiq::Workers.new +workers.size => 2 +workers.each do |name, work| + # name is a unique identifier per Processor instance + # work is a Hash which looks like: + # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg } +end +``` + - Allow environment-specific sections within the config file which override the global values [dtaniwaki, #630] diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 7de5ddc1..7bf6cf02 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -333,4 +333,43 @@ module Sidekiq end end + + ## + # Programmatic access to the current active worker set. + # + # WARNING WARNING WARNING + # + # This is live data that can change every millisecond. + # If you do #size => 5 and then expect #each to be + # called 5 times, you're going to have a bad time. + # + # workers = Sidekiq::Workers.new + # workers.size => 2 + # workers.each do |name, work| + # # name is a unique identifier per Processor instance + # # work is a Hash which looks like: + # # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg } + # end + + class Workers + include Enumerable + + def each(&block) + Sidekiq.redis do |conn| + workers = conn.smembers("workers") + workers.each do |w| + msg = conn.get("worker:#{w}") + next unless msg + block.call(w, Sidekiq.load_json(msg)) + end + end + end + + def size + Sidekiq.redis do |conn| + conn.scard("workers") + end.to_i + end + end + end diff --git a/test/test_api.rb b/test/test_api.rb index 4946676c..d4ad08d7 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -269,6 +269,29 @@ class TestApi < MiniTest::Unit::TestCase assert_equal 0, r.size end + it 'can enumerate workers' do + w = Sidekiq::Workers.new + assert_equal 0, w.size + w.each do |x, y| + assert false + end + + s = '12345' + data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i }) + Sidekiq.redis do |c| + c.multi do + c.sadd('workers', s) + c.set("worker:#{s}", data) + end + end + + assert_equal 1, w.size + w.each do |x, y| + assert_equal s, x + assert_equal 'default', y['queue'] + end + end + def add_retry(jid = 'bob', at = Time.now.to_f) payload = Sidekiq.dump_json('class' => 'ApiWorker', 'args' => [1, 'mike'], 'queue' => 'default', 'jid' => jid, 'retry_count' => 2, 'failed_at' => Time.now.utc) Sidekiq.redis do |conn|