mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge main, adjust WorkSet#each to not greed parse job_hash JSON
This commit is contained in:
commit
ad9442f116
6 changed files with 32 additions and 16 deletions
|
@ -1,9 +1,9 @@
|
|||
require "sidekiq"
|
||||
|
||||
# Start up sidekiq via
|
||||
# ./bin/sidekiq -r ./examples/por.rb
|
||||
# bundle exec bin/sidekiq -r ./examples/por.rb
|
||||
# and then you can open up an IRB session like so:
|
||||
# irb -r ./examples/por.rb
|
||||
# bundle exec irb -r ./examples/por.rb
|
||||
# where you can then say
|
||||
# PlainOldRuby.perform_async "like a dog", 3
|
||||
#
|
||||
|
|
|
@ -1034,24 +1034,24 @@ module Sidekiq
|
|||
|
||||
def each(&block)
|
||||
results = []
|
||||
procs = nil
|
||||
all_works = nil
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
procs = conn.sscan("processes").to_a
|
||||
procs.sort.each do |key|
|
||||
valid, workers = conn.pipelined { |pipeline|
|
||||
pipeline.exists(key)
|
||||
procs = conn.sscan("processes").to_a.sort
|
||||
all_works = conn.pipelined do |pipeline|
|
||||
procs.each do |key|
|
||||
pipeline.hgetall("#{key}:work")
|
||||
}
|
||||
next unless valid > 0
|
||||
workers.each_pair do |tid, json|
|
||||
hsh = Sidekiq.load_json(json)
|
||||
p = hsh["payload"]
|
||||
# avoid breaking API, this is a side effect of the JSON optimization in #4316
|
||||
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
|
||||
results << [key, tid, hsh]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
procs.zip(all_works).each do |key, workers|
|
||||
workers.each_pair do |tid, json|
|
||||
results << [key, tid, Sidekiq.load_json(json)] unless json.empty?
|
||||
end
|
||||
end
|
||||
|
||||
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
|
||||
end
|
||||
|
||||
|
|
|
@ -43,5 +43,13 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
def page_items(items, pageidx = 1, page_size = 25)
|
||||
current_page = pageidx.to_i < 1 ? 1 : pageidx.to_i
|
||||
pageidx = current_page - 1
|
||||
starting = pageidx * page_size
|
||||
items = items.to_a
|
||||
[current_page, items.size, items[starting, page_size]]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -74,6 +74,9 @@ module Sidekiq
|
|||
end
|
||||
|
||||
get "/busy" do
|
||||
@count = (params["count"] || 100).to_i
|
||||
(@current_page, @total_size, @workset) = page_items(workset, params["page"], @count)
|
||||
|
||||
erb(:busy)
|
||||
end
|
||||
|
||||
|
|
|
@ -582,7 +582,7 @@ describe "API" do
|
|||
assert_equal key, p
|
||||
assert_equal "1234", x
|
||||
assert_equal "default", y["queue"]
|
||||
assert_equal({}, y["payload"])
|
||||
assert_equal("{}", y["payload"])
|
||||
assert_equal Time.now.year, Time.at(y["run_at"]).year
|
||||
end
|
||||
|
||||
|
|
|
@ -96,6 +96,11 @@
|
|||
<div class="col-sm-7">
|
||||
<h3><%= t('Jobs') %></h3>
|
||||
</div>
|
||||
<% if @workset.size > 0 && @total_size > @count %>
|
||||
<div class="col-sm-4">
|
||||
<%= erb :_paging, locals: { url: "#{root_path}busy" } %>
|
||||
</div>
|
||||
<% end %>
|
||||
</div>
|
||||
|
||||
<div class="table_container">
|
||||
|
@ -109,7 +114,7 @@
|
|||
<th><%= t('Arguments') %></th>
|
||||
<th><%= t('Started') %></th>
|
||||
</thead>
|
||||
<% workset.each do |process, thread, msg| %>
|
||||
<% @workset.each do |process, thread, msg| %>
|
||||
<% job = Sidekiq::JobRecord.new(msg['payload']) %>
|
||||
<tr>
|
||||
<td><%= process %></td>
|
||||
|
|
Loading…
Reference in a new issue