mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add Sidekiq::Workers API, closes #639
This commit is contained in:
parent
4cca6eb85d
commit
c606dd4fde
3 changed files with 75 additions and 0 deletions
13
Changes.md
13
Changes.md
|
@ -1,6 +1,19 @@
|
||||||
HEAD
|
HEAD
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
|
- Add Sidekiq::Workers API giving programmatic access to the current
|
||||||
|
set of active workers.
|
||||||
|
|
||||||
|
```
|
||||||
|
workers = Sidekiq::Workers.new
|
||||||
|
workers.size => 2
|
||||||
|
workers.each do |name, work|
|
||||||
|
# name is a unique identifier per Processor instance
|
||||||
|
# work is a Hash which looks like:
|
||||||
|
# { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
|
||||||
|
end
|
||||||
|
```
|
||||||
|
|
||||||
- Allow environment-specific sections within the config file which
|
- Allow environment-specific sections within the config file which
|
||||||
override the global values [dtaniwaki, #630]
|
override the global values [dtaniwaki, #630]
|
||||||
|
|
||||||
|
|
|
@ -333,4 +333,43 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
##
|
||||||
|
# Programmatic access to the current active worker set.
|
||||||
|
#
|
||||||
|
# WARNING WARNING WARNING
|
||||||
|
#
|
||||||
|
# This is live data that can change every millisecond.
|
||||||
|
# If you do #size => 5 and then expect #each to be
|
||||||
|
# called 5 times, you're going to have a bad time.
|
||||||
|
#
|
||||||
|
# workers = Sidekiq::Workers.new
|
||||||
|
# workers.size => 2
|
||||||
|
# workers.each do |name, work|
|
||||||
|
# # name is a unique identifier per Processor instance
|
||||||
|
# # work is a Hash which looks like:
|
||||||
|
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
|
||||||
|
# end
|
||||||
|
|
||||||
|
class Workers
|
||||||
|
include Enumerable
|
||||||
|
|
||||||
|
def each(&block)
|
||||||
|
Sidekiq.redis do |conn|
|
||||||
|
workers = conn.smembers("workers")
|
||||||
|
workers.each do |w|
|
||||||
|
msg = conn.get("worker:#{w}")
|
||||||
|
next unless msg
|
||||||
|
block.call(w, Sidekiq.load_json(msg))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def size
|
||||||
|
Sidekiq.redis do |conn|
|
||||||
|
conn.scard("workers")
|
||||||
|
end.to_i
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -269,6 +269,29 @@ class TestApi < MiniTest::Unit::TestCase
|
||||||
assert_equal 0, r.size
|
assert_equal 0, r.size
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'can enumerate workers' do
|
||||||
|
w = Sidekiq::Workers.new
|
||||||
|
assert_equal 0, w.size
|
||||||
|
w.each do |x, y|
|
||||||
|
assert false
|
||||||
|
end
|
||||||
|
|
||||||
|
s = '12345'
|
||||||
|
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i })
|
||||||
|
Sidekiq.redis do |c|
|
||||||
|
c.multi do
|
||||||
|
c.sadd('workers', s)
|
||||||
|
c.set("worker:#{s}", data)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_equal 1, w.size
|
||||||
|
w.each do |x, y|
|
||||||
|
assert_equal s, x
|
||||||
|
assert_equal 'default', y['queue']
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def add_retry(jid = 'bob', at = Time.now.to_f)
|
def add_retry(jid = 'bob', at = Time.now.to_f)
|
||||||
payload = Sidekiq.dump_json('class' => 'ApiWorker', 'args' => [1, 'mike'], 'queue' => 'default', 'jid' => jid, 'retry_count' => 2, 'failed_at' => Time.now.utc)
|
payload = Sidekiq.dump_json('class' => 'ApiWorker', 'args' => [1, 'mike'], 'queue' => 'default', 'jid' => jid, 'retry_count' => 2, 'failed_at' => Time.now.utc)
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
|
|
Loading…
Add table
Reference in a new issue