mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Use both score and jid in web param and parse on server to increase
efficiency in picking jobs out of the queue.
This commit is contained in:
parent
51569e2af1
commit
b77da8d466
7 changed files with 121 additions and 78 deletions
|
@ -105,7 +105,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def delete
|
||||
@parent.delete(score)
|
||||
@parent.delete(score, jid)
|
||||
end
|
||||
|
||||
def retry
|
||||
|
@ -150,11 +150,41 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def delete(score)
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.zremrangebyscore(@zset, score, score)
|
||||
def fetch(score, jid = nil)
|
||||
elements = Sidekiq.redis do |conn|
|
||||
conn.zrangebyscore(@zset, score, score)
|
||||
end
|
||||
|
||||
elements.inject([]) do |result, element|
|
||||
entry = SortedEntry.new(self, score, element)
|
||||
if jid
|
||||
result << entry if entry.jid == jid
|
||||
else
|
||||
result << entry
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def delete(score, jid = nil)
|
||||
if jid
|
||||
elements = Sidekiq.redis do |conn|
|
||||
conn.zrangebyscore(@zset, score, score)
|
||||
end
|
||||
|
||||
elements_with_jid = elements.map do |element|
|
||||
message = Sidekiq.load_json(element)
|
||||
|
||||
if message["jid"] == jid
|
||||
Sidekiq.redis { |conn| conn.zrem(@zset, element) }
|
||||
end
|
||||
end
|
||||
elements_with_jid.count != 0
|
||||
else
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.zremrangebyscore(@zset, score, score)
|
||||
end
|
||||
count != 0
|
||||
end
|
||||
count != 0
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -2,9 +2,18 @@ require 'sinatra/base'
|
|||
require 'slim'
|
||||
require 'sidekiq/paginator'
|
||||
|
||||
module Sidekiq
|
||||
module Helpers
|
||||
def job_params(job, score)
|
||||
"#{score}-#{job['jid']}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module Sidekiq
|
||||
class Web < Sinatra::Base
|
||||
include Sidekiq::Paginator
|
||||
include Sidekiq::Helpers
|
||||
|
||||
dir = File.expand_path(File.dirname(__FILE__) + "/../../web")
|
||||
set :public_folder, "#{dir}/assets"
|
||||
|
@ -86,6 +95,11 @@ module Sidekiq
|
|||
%{<time datetime="#{time.getutc.iso8601}">#{time}</time>}
|
||||
end
|
||||
|
||||
def parse_params(params)
|
||||
score, jid = params.split("-")
|
||||
[score.to_f, jid]
|
||||
end
|
||||
|
||||
def display_args(args, count=100)
|
||||
args.map { |arg| a = arg.inspect; a.size > count ? "#{a[0..count]}..." : a }.join(", ")
|
||||
end
|
||||
|
@ -157,39 +171,34 @@ module Sidekiq
|
|||
slim :retries
|
||||
end
|
||||
|
||||
get "/retries/:jid" do
|
||||
halt 404 unless params[:jid]
|
||||
@retry = Sidekiq::RetrySet.new.select do |retri|
|
||||
retri.jid == params[:jid]
|
||||
end.first
|
||||
get "/retries/:key" do
|
||||
halt 404 unless params['key']
|
||||
@retry = Sidekiq::RetrySet.new.fetch(*parse_params(params['key'])).first
|
||||
redirect "#{root_path}retries" if @retry.nil?
|
||||
slim :retry
|
||||
end
|
||||
|
||||
post '/retries' do
|
||||
halt 404 unless params['jid']
|
||||
if params['delete']
|
||||
Sidekiq::RetrySet.new.select do |job|
|
||||
job.jid.in?(params['jid'])
|
||||
end.map(&:delete)
|
||||
elsif params['retry']
|
||||
Sidekiq::RetrySet.new.select do |job|
|
||||
job.jid.in?(params['jid'])
|
||||
end.map(&:retry)
|
||||
halt 404 unless params['key']
|
||||
|
||||
params['key'].each do |key|
|
||||
job = Sidekiq::RetrySet.new.fetch(*parse_params(key)).first
|
||||
if params['retry']
|
||||
job.retry
|
||||
elsif params['delete']
|
||||
job.delete
|
||||
end
|
||||
end
|
||||
redirect "#{root_path}retries"
|
||||
end
|
||||
|
||||
post "/retries/:jid" do
|
||||
halt 404 unless params['jid']
|
||||
post "/retries/:key" do
|
||||
halt 404 unless params['key']
|
||||
job = Sidekiq::RetrySet.new.fetch(*parse_params(params['key'])).first
|
||||
if params['retry']
|
||||
Sidekiq::RetrySet.new.select do |job|
|
||||
job.jid == params['jid']
|
||||
end.first.retry
|
||||
job.retry
|
||||
elsif params['delete']
|
||||
Sidekiq::RetrySet.new.select do |job|
|
||||
job.jid == params['jid']
|
||||
end.first.delete
|
||||
job.delete
|
||||
end
|
||||
redirect "#{root_path}retries"
|
||||
end
|
||||
|
@ -202,11 +211,11 @@ module Sidekiq
|
|||
end
|
||||
|
||||
post '/scheduled' do
|
||||
halt 404 unless params['jid']
|
||||
halt 404 unless params['key']
|
||||
halt 404 unless params['delete']
|
||||
Sidekiq::ScheduledSet.new.select do |job|
|
||||
job.jid.in?(params['jid'])
|
||||
end.map(&:delete)
|
||||
params['key'].each do |key|
|
||||
Sidekiq::ScheduledSet.new.fetch(*parse_params(key)).first.delete
|
||||
end
|
||||
redirect "#{root_path}scheduled"
|
||||
end
|
||||
|
||||
|
|
|
@ -56,21 +56,23 @@ class TestApi < MiniTest::Unit::TestCase
|
|||
assert_in_delta Time.now.to_f, retri.at.to_f, 0.01
|
||||
end
|
||||
|
||||
it 'can delete retries' do
|
||||
add_retry
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 1, r.size
|
||||
r.map(&:delete)
|
||||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
it 'can delete a single retry' do
|
||||
it 'can delete multiple retries from score' do
|
||||
same_time = Time.now.to_f
|
||||
add_retry('bob1', same_time)
|
||||
add_retry('bob2', same_time)
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 2, r.size
|
||||
r.first.delete
|
||||
Sidekiq::RetrySet.new.delete(same_time)
|
||||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
it 'can delete a single retry from score and jid' do
|
||||
same_time = Time.now.to_f
|
||||
add_retry('bob1', same_time)
|
||||
add_retry('bob2', same_time)
|
||||
r = Sidekiq::RetrySet.new
|
||||
assert_equal 2, r.size
|
||||
Sidekiq::RetrySet.new.delete(same_time, 'bob1')
|
||||
assert_equal 1, r.size
|
||||
end
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ require 'rack/test'
|
|||
class TestWeb < MiniTest::Unit::TestCase
|
||||
describe 'sidekiq web' do
|
||||
include Rack::Test::Methods
|
||||
include Sidekiq::Helpers
|
||||
|
||||
def app
|
||||
Sidekiq::Web
|
||||
|
@ -48,11 +49,6 @@ class TestWeb < MiniTest::Unit::TestCase
|
|||
refute_match /HardWorker/, last_response.body
|
||||
end
|
||||
|
||||
it 'handles missing retry' do
|
||||
get '/retries/2c4c17969825a384a92f023b'
|
||||
assert_equal 302, last_response.status
|
||||
end
|
||||
|
||||
it 'handles queue view' do
|
||||
get '/queues/default'
|
||||
assert_equal 200, last_response.status
|
||||
|
@ -93,20 +89,6 @@ class TestWeb < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'can display scheduled' do
|
||||
get '/scheduled'
|
||||
assert_equal 200, last_response.status
|
||||
assert_match /found/, last_response.body
|
||||
refute_match /HardWorker/, last_response.body
|
||||
|
||||
add_scheduled
|
||||
|
||||
get '/scheduled'
|
||||
assert_equal 200, last_response.status
|
||||
refute_match /found/, last_response.body
|
||||
assert_match /HardWorker/, last_response.body
|
||||
end
|
||||
|
||||
it 'can display retries' do
|
||||
get '/retries'
|
||||
assert_equal 200, last_response.status
|
||||
|
@ -122,41 +104,60 @@ class TestWeb < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'can display a single retry' do
|
||||
params = add_retry
|
||||
get '/retries/2c4c17969825a384a92f023b'
|
||||
assert_equal 302, last_response.status
|
||||
msg = add_retry
|
||||
get "/retries/#{msg['jid']}"
|
||||
get "/retries/#{job_params(*params)}"
|
||||
assert_equal 200, last_response.status
|
||||
assert_match /HardWorker/, last_response.body
|
||||
end
|
||||
|
||||
it 'handles missing retry' do
|
||||
get "/retries/2c4c17969825a384a92f023b"
|
||||
assert_equal 302, last_response.status
|
||||
end
|
||||
|
||||
it 'can delete a single retry' do
|
||||
msg = add_retry
|
||||
post "/retries/#{msg['jid']}", 'delete' => 'Delete'
|
||||
params = add_retry
|
||||
post "/retries/#{job_params(*params)}", 'delete' => 'Delete'
|
||||
assert_equal 302, last_response.status
|
||||
assert_equal 'http://example.org/retries', last_response.header['Location']
|
||||
|
||||
get "/retries"
|
||||
assert_equal 200, last_response.status
|
||||
refute_match /#{msg['args'][2]}/, last_response.body
|
||||
refute_match /#{params.first['args'][2]}/, last_response.body
|
||||
end
|
||||
|
||||
it 'can retry a single retry now' do
|
||||
msg = add_retry
|
||||
post "/retries/#{msg['jid']}", 'retry' => 'Retry'
|
||||
params = add_retry
|
||||
post "/retries/#{job_params(*params)}", 'retry' => 'Retry'
|
||||
assert_equal 302, last_response.status
|
||||
assert_equal 'http://example.org/retries', last_response.header['Location']
|
||||
|
||||
get '/queues/default'
|
||||
assert_equal 200, last_response.status
|
||||
assert_match /#{msg['args'][2]}/, last_response.body
|
||||
assert_match /#{params.first['args'][2]}/, last_response.body
|
||||
end
|
||||
|
||||
it 'can display scheduled' do
|
||||
get '/scheduled'
|
||||
assert_equal 200, last_response.status
|
||||
assert_match /found/, last_response.body
|
||||
refute_match /HardWorker/, last_response.body
|
||||
|
||||
add_scheduled
|
||||
|
||||
get '/scheduled'
|
||||
assert_equal 200, last_response.status
|
||||
refute_match /found/, last_response.body
|
||||
assert_match /HardWorker/, last_response.body
|
||||
end
|
||||
|
||||
it 'can delete scheduled' do
|
||||
msg = add_scheduled
|
||||
params = add_scheduled
|
||||
Sidekiq.redis do |conn|
|
||||
assert_equal 1, conn.zcard('schedule')
|
||||
post '/scheduled', 'jid' => [msg['jid']], 'delete' => 'Delete'
|
||||
post '/scheduled', 'key' => [job_params(*params)], 'delete' => 'Delete'
|
||||
assert_equal 302, last_response.status
|
||||
assert_equal 'http://example.org/scheduled', last_response.header['Location']
|
||||
assert_equal 0, conn.zcard('schedule')
|
||||
|
@ -179,11 +180,12 @@ class TestWeb < MiniTest::Unit::TestCase
|
|||
score = Time.now.to_f
|
||||
msg = { 'class' => 'HardWorker',
|
||||
'args' => ['bob', 1, Time.now.to_f],
|
||||
'at' => score }
|
||||
'at' => score,
|
||||
'jid' => 'f39af2a05e8f4b24dbc0f1e4' }
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd('schedule', score, Sidekiq.dump_json(msg))
|
||||
end
|
||||
msg
|
||||
[msg, score]
|
||||
end
|
||||
|
||||
def add_retry
|
||||
|
@ -194,12 +196,12 @@ class TestWeb < MiniTest::Unit::TestCase
|
|||
'error_class' => 'RuntimeError',
|
||||
'retry_count' => 0,
|
||||
'failed_at' => Time.now.utc,
|
||||
'jid' => "f39af2a05e8f4b24dbc0f1e4"}
|
||||
'jid' => 'f39af2a05e8f4b24dbc0f1e4'}
|
||||
score = Time.now.to_f
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd('retry', score, Sidekiq.dump_json(msg))
|
||||
end
|
||||
msg
|
||||
[msg, score]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -20,9 +20,9 @@ header.row
|
|||
- @retries.each do |msg, score|
|
||||
tr
|
||||
td
|
||||
input type='checkbox' name='jid[]' value='#{msg['jid']}'
|
||||
input type='checkbox' name='key[]' value='#{job_params(msg, score)}'
|
||||
td
|
||||
a href="#{root_path}retries/#{msg['jid']}"== relative_time(Time.at(score))
|
||||
a href="#{root_path}retries/#{job_params(msg, score)}"== relative_time(Time.at(score))
|
||||
td= msg['retry_count']
|
||||
td
|
||||
a href="#{root_path}queues/#{msg['queue']}" #{msg['queue']}
|
||||
|
|
|
@ -49,7 +49,7 @@ table class="error table table-bordered table-striped"
|
|||
th Error Backtrace
|
||||
td
|
||||
code== @retry['error_backtrace'].join("<br/>")
|
||||
form.form-horizontal action="#{root_path}retries/#{@retry.jid}" method="post"
|
||||
form.form-horizontal action="#{root_path}retries/#{job_params(@retry, @retry.score)}" method="post"
|
||||
a.btn href="#{root_path}retries" ← Back
|
||||
input.btn.btn-primary type="submit" name="retry" value="Retry Now"
|
||||
input.btn.btn-danger type="submit" name="delete" value="Delete"
|
||||
|
|
|
@ -19,7 +19,7 @@ header.row
|
|||
- @scheduled.each do |msg, score|
|
||||
tr
|
||||
td
|
||||
input type='checkbox' name='jid[]' value='#{msg['jid']}'
|
||||
input type='checkbox' name='key[]' value='#{job_params(msg, score)}'
|
||||
td== relative_time(Time.at(score))
|
||||
td
|
||||
a href="#{root_path}queues/#{msg['queue']}" #{msg['queue']}
|
||||
|
|
Loading…
Add table
Reference in a new issue