mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Support job tags
This commit is contained in:
parent
c0c1ab25e5
commit
c428383523
15 changed files with 88 additions and 12 deletions
11
Changes.md
11
Changes.md
|
@ -5,6 +5,17 @@
|
||||||
HEAD
|
HEAD
|
||||||
---------
|
---------
|
||||||
|
|
||||||
|
- Support ad-hoc job tags. You can tag your jobs with, for example, subdomain, tenant, country, locale, application,
|
||||||
|
version, user/client, "alpha/beta/pro/ent", types of jobs, teams/people responsible for jobs,
|
||||||
|
additional metadata, etc.
|
||||||
|
Tags are shown on different pages with job listings [fatkodima, #4280]
|
||||||
|
```ruby
|
||||||
|
class MyWorker
|
||||||
|
include Sidekiq::Worker
|
||||||
|
sidekiq_options tags: ['foo', 'bar']
|
||||||
|
...
|
||||||
|
end
|
||||||
|
```
|
||||||
- Get scheduled jobs in batches before pushing into specific queues.
|
- Get scheduled jobs in batches before pushing into specific queues.
|
||||||
This will decrease enqueueing time of scheduled jobs by a third. [fatkodima, #4273]
|
This will decrease enqueueing time of scheduled jobs by a third. [fatkodima, #4273]
|
||||||
```
|
```
|
||||||
|
|
|
@ -388,6 +388,10 @@ module Sidekiq
|
||||||
Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
|
Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def tags
|
||||||
|
self["tags"] || []
|
||||||
|
end
|
||||||
|
|
||||||
def error_backtrace
|
def error_backtrace
|
||||||
# Cache nil values
|
# Cache nil values
|
||||||
if defined?(@error_backtrace)
|
if defined?(@error_backtrace)
|
||||||
|
|
|
@ -223,6 +223,7 @@ module Sidekiq
|
||||||
raise(ArgumentError, "Job args must be an Array") unless item["args"].is_a?(Array)
|
raise(ArgumentError, "Job args must be an Array") unless item["args"].is_a?(Array)
|
||||||
raise(ArgumentError, "Job class must be either a Class or String representation of the class name") unless item["class"].is_a?(Class) || item["class"].is_a?(String)
|
raise(ArgumentError, "Job class must be either a Class or String representation of the class name") unless item["class"].is_a?(Class) || item["class"].is_a?(String)
|
||||||
raise(ArgumentError, "Job 'at' must be a Numeric timestamp") if item.key?("at") && !item["at"].is_a?(Numeric)
|
raise(ArgumentError, "Job 'at' must be a Numeric timestamp") if item.key?("at") && !item["at"].is_a?(Numeric)
|
||||||
|
raise(ArgumentError, "Job tags must be an Array") if item["tags"] && !item["tags"].is_a?(Array)
|
||||||
# raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
# raise(ArgumentError, "Arguments must be native JSON types, see https://github.com/mperham/sidekiq/wiki/Best-Practices") unless JSON.load(JSON.dump(item['args'])) == item['args']
|
||||||
|
|
||||||
normalized_hash(item["class"])
|
normalized_hash(item["class"])
|
||||||
|
|
|
@ -35,6 +35,7 @@ module Sidekiq
|
||||||
jid: job_hash["jid"],
|
jid: job_hash["jid"],
|
||||||
}
|
}
|
||||||
h[:bid] = job_hash["bid"] if job_hash["bid"]
|
h[:bid] = job_hash["bid"] if job_hash["bid"]
|
||||||
|
h[:tags] = job_hash["tags"] if job_hash["tags"]
|
||||||
h
|
h
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,16 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def format_context
|
def format_context
|
||||||
" " + ctx.compact.map { |k, v| "#{k}=#{v}" }.join(" ") if ctx.any?
|
if ctx.any?
|
||||||
|
" " + ctx.compact.map { |k, v|
|
||||||
|
case v
|
||||||
|
when Array
|
||||||
|
"#{k}=#{v.join(",")}"
|
||||||
|
else
|
||||||
|
"#{k}=#{v}"
|
||||||
|
end
|
||||||
|
}.join(" ")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -249,7 +249,7 @@ module Sidekiq
|
||||||
queue class args retry_count retried_at failed_at
|
queue class args retry_count retried_at failed_at
|
||||||
jid error_message error_class backtrace
|
jid error_message error_class backtrace
|
||||||
error_backtrace enqueued_at retry wrapped
|
error_backtrace enqueued_at retry wrapped
|
||||||
created_at
|
created_at tags
|
||||||
])
|
])
|
||||||
|
|
||||||
def retry_extra_items(retry_job)
|
def retry_extra_items(retry_job)
|
||||||
|
|
|
@ -212,6 +212,11 @@ describe 'API' do
|
||||||
include Sidekiq::Worker
|
include Sidekiq::Worker
|
||||||
end
|
end
|
||||||
|
|
||||||
|
class WorkerWithTags
|
||||||
|
include Sidekiq::Worker
|
||||||
|
sidekiq_options tags: ['foo']
|
||||||
|
end
|
||||||
|
|
||||||
it 'can enumerate jobs' do
|
it 'can enumerate jobs' do
|
||||||
q = Sidekiq::Queue.new
|
q = Sidekiq::Queue.new
|
||||||
Time.stub(:now, Time.new(2012, 12, 26)) do
|
Time.stub(:now, Time.new(2012, 12, 26)) do
|
||||||
|
@ -295,6 +300,14 @@ describe 'API' do
|
||||||
assert_nil job.enqueued_at
|
assert_nil job.enqueued_at
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'returns tags field for jobs' do
|
||||||
|
job_id = ApiWorker.perform_async
|
||||||
|
assert_equal [], Sidekiq::Queue.new.find_job(job_id).tags
|
||||||
|
|
||||||
|
job_id = WorkerWithTags.perform_async
|
||||||
|
assert_equal ['foo'], Sidekiq::Queue.new.find_job(job_id).tags
|
||||||
|
end
|
||||||
|
|
||||||
it 'can delete jobs' do
|
it 'can delete jobs' do
|
||||||
q = Sidekiq::Queue.new
|
q = Sidekiq::Queue.new
|
||||||
ApiWorker.perform_async(1, 'mike')
|
ApiWorker.perform_async(1, 'mike')
|
||||||
|
|
|
@ -32,6 +32,10 @@ describe Sidekiq::Client do
|
||||||
assert_raises ArgumentError do
|
assert_raises ArgumentError do
|
||||||
Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2], 'at' => [Time.now.to_f, :not_a_numeric])
|
Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2], 'at' => [Time.now.to_f, :not_a_numeric])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
assert_raises ArgumentError do
|
||||||
|
Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1], 'tags' => :not_an_array)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ class TestJobLogger < Minitest::Test
|
||||||
|
|
||||||
# pretty
|
# pretty
|
||||||
p = @logger.formatter = Sidekiq::Logger::Formatters::Pretty.new
|
p = @logger.formatter = Sidekiq::Logger::Formatters::Pretty.new
|
||||||
job = {"jid"=>"1234abc", "wrapped"=>"FooWorker", "class"=>"Wrapper"}
|
job = {"jid"=>"1234abc", "wrapped"=>"FooWorker", "class"=>"Wrapper", "tags" => ["bar", "baz"]}
|
||||||
# this mocks what Processor does
|
# this mocks what Processor does
|
||||||
jl.with_job_hash_context(job) do
|
jl.with_job_hash_context(job) do
|
||||||
jl.call(job, 'queue') {}
|
jl.call(job, 'queue') {}
|
||||||
|
@ -34,7 +34,7 @@ class TestJobLogger < Minitest::Test
|
||||||
assert a
|
assert a
|
||||||
assert b
|
assert b
|
||||||
|
|
||||||
expected = /pid=#{$$} tid=#{p.tid} class=FooWorker jid=1234abc/
|
expected = /pid=#{$$} tid=#{p.tid} class=FooWorker jid=1234abc tags=bar,baz/
|
||||||
assert_match(expected, a)
|
assert_match(expected, a)
|
||||||
assert_match(expected, b)
|
assert_match(expected, b)
|
||||||
assert_match(/#{Time.now.utc.to_date}.+Z pid=#{$$} tid=#{p.tid} .+INFO: done/, b)
|
assert_match(/#{Time.now.utc.to_date}.+Z pid=#{$$} tid=#{p.tid} .+INFO: done/, b)
|
||||||
|
@ -44,7 +44,7 @@ class TestJobLogger < Minitest::Test
|
||||||
# json
|
# json
|
||||||
@logger.formatter = Sidekiq::Logger::Formatters::JSON.new
|
@logger.formatter = Sidekiq::Logger::Formatters::JSON.new
|
||||||
jl = Sidekiq::JobLogger.new(@logger)
|
jl = Sidekiq::JobLogger.new(@logger)
|
||||||
job = {"jid"=>"1234abc", "wrapped"=>"Wrapper", "class"=>"FooWorker", "bid"=>"b-xyz"}
|
job = {"jid"=>"1234abc", "wrapped"=>"Wrapper", "class"=>"FooWorker", "bid"=>"b-xyz", "tags" => ["bar", "baz"]}
|
||||||
# this mocks what Processor does
|
# this mocks what Processor does
|
||||||
jl.with_job_hash_context(job) do
|
jl.with_job_hash_context(job) do
|
||||||
jl.call(job, 'queue') {}
|
jl.call(job, 'queue') {}
|
||||||
|
@ -56,7 +56,7 @@ class TestJobLogger < Minitest::Test
|
||||||
keys = hsh.keys.sort
|
keys = hsh.keys.sort
|
||||||
assert_equal(["ctx", "lvl", "msg", "pid", "tid", "ts"], keys)
|
assert_equal(["ctx", "lvl", "msg", "pid", "tid", "ts"], keys)
|
||||||
keys = hsh["ctx"].keys.sort
|
keys = hsh["ctx"].keys.sort
|
||||||
assert_equal(["bid", "class", "jid"], keys)
|
assert_equal(["bid", "class", "jid", "tags"], keys)
|
||||||
end
|
end
|
||||||
|
|
||||||
def reset(io)
|
def reset(io)
|
||||||
|
|
|
@ -279,6 +279,13 @@ describe Sidekiq::Web do
|
||||||
assert_match(/HardWorker/, last_response.body)
|
assert_match(/HardWorker/, last_response.body)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'can display a single scheduled job tags' do
|
||||||
|
params = add_scheduled
|
||||||
|
get "/scheduled/#{job_params(*params)}"
|
||||||
|
assert_match(/tag1/, last_response.body)
|
||||||
|
assert_match(/tag2/, last_response.body)
|
||||||
|
end
|
||||||
|
|
||||||
it 'handles missing scheduled job' do
|
it 'handles missing scheduled job' do
|
||||||
get "/scheduled/0-shouldntexist"
|
get "/scheduled/0-shouldntexist"
|
||||||
assert_equal 302, last_response.status
|
assert_equal 302, last_response.status
|
||||||
|
@ -579,7 +586,8 @@ describe Sidekiq::Web do
|
||||||
score = Time.now.to_f
|
score = Time.now.to_f
|
||||||
msg = { 'class' => 'HardWorker',
|
msg = { 'class' => 'HardWorker',
|
||||||
'args' => ['bob', 1, Time.now.to_f],
|
'args' => ['bob', 1, Time.now.to_f],
|
||||||
'jid' => SecureRandom.hex(12) }
|
'jid' => SecureRandom.hex(12),
|
||||||
|
'tags' => ['tag1', 'tag2'], }
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
conn.zadd('schedule', score, Sidekiq.dump_json(msg))
|
conn.zadd('schedule', score, Sidekiq.dump_json(msg))
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,7 +14,12 @@
|
||||||
<tr>
|
<tr>
|
||||||
<th><%= t('Job') %></th>
|
<th><%= t('Job') %></th>
|
||||||
<td>
|
<td>
|
||||||
<code><%= job.display_class %></code>
|
<code>
|
||||||
|
<%= job.display_class %>
|
||||||
|
<% job.tags.each do |tag| %>
|
||||||
|
<span class="label label-info"><%= tag %></span>
|
||||||
|
<% end %>
|
||||||
|
</code>
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
|
|
|
@ -87,7 +87,12 @@
|
||||||
<td>
|
<td>
|
||||||
<a href="<%= root_path %>queues/<%= msg['queue'] %>"><%= msg['queue'] %></a>
|
<a href="<%= root_path %>queues/<%= msg['queue'] %>"><%= msg['queue'] %></a>
|
||||||
</td>
|
</td>
|
||||||
<td><%= job.display_class %></td>
|
<td>
|
||||||
|
<%= job.display_class %>
|
||||||
|
<% job.tags.each do |tag| %>
|
||||||
|
<span class="label label-info"><%= tag %></span>
|
||||||
|
<% end %>
|
||||||
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<div class="args"><%= display_args(job.display_args) %></div>
|
<div class="args"><%= display_args(job.display_args) %></div>
|
||||||
</td>
|
</td>
|
||||||
|
|
|
@ -42,7 +42,12 @@
|
||||||
<td>
|
<td>
|
||||||
<a href="<%= root_path %>queues/<%= entry.queue %>"><%= entry.queue %></a>
|
<a href="<%= root_path %>queues/<%= entry.queue %>"><%= entry.queue %></a>
|
||||||
</td>
|
</td>
|
||||||
<td><%= entry.display_class %></td>
|
<td>
|
||||||
|
<%= entry.display_class %>
|
||||||
|
<% entry.tags.each do |tag| %>
|
||||||
|
<span class="label label-info"><%= tag %></span>
|
||||||
|
<% end %>
|
||||||
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<div class="args"><%= display_args(entry.display_args) %></div>
|
<div class="args"><%= display_args(entry.display_args) %></div>
|
||||||
</td>
|
</td>
|
||||||
|
|
|
@ -27,7 +27,12 @@
|
||||||
<% else %>
|
<% else %>
|
||||||
<td><%= @total_size - (@count * (@current_page - 1) + index) %></td>
|
<td><%= @total_size - (@count * (@current_page - 1) + index) %></td>
|
||||||
<% end %>
|
<% end %>
|
||||||
<td><%= h(msg.display_class) %></td>
|
<td>
|
||||||
|
<%= h(msg.display_class) %>
|
||||||
|
<% msg.tags.each do |tag| %>
|
||||||
|
<span class="label label-info"><%= tag %></span>
|
||||||
|
<% end %>
|
||||||
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<% a = msg.display_args %>
|
<% a = msg.display_args %>
|
||||||
<% if a.inspect.size > 100 %>
|
<% if a.inspect.size > 100 %>
|
||||||
|
|
|
@ -38,7 +38,12 @@
|
||||||
<td>
|
<td>
|
||||||
<a href="<%= root_path %>queues/<%= entry.queue %>"><%= entry.queue %></a>
|
<a href="<%= root_path %>queues/<%= entry.queue %>"><%= entry.queue %></a>
|
||||||
</td>
|
</td>
|
||||||
<td><%= entry.display_class %></td>
|
<td>
|
||||||
|
<%= entry.display_class %>
|
||||||
|
<% entry.tags.each do |tag| %>
|
||||||
|
<span class="label label-info"><%= tag %></span>
|
||||||
|
<% end %>
|
||||||
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<div class="args"><%= display_args(entry.display_args) %></div>
|
<div class="args"><%= display_args(entry.display_args) %></div>
|
||||||
</td>
|
</td>
|
||||||
|
|
Loading…
Add table
Reference in a new issue