mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Sharding testing and fixes
This commit is contained in:
parent
8d66ee729a
commit
e97c2c2c5a
2 changed files with 22 additions and 16 deletions
|
@ -584,20 +584,16 @@ module Sidekiq
|
|||
# @!attribute [r] Name
|
||||
attr_reader :name
|
||||
|
||||
# Redis location
|
||||
attr_accessor :pool
|
||||
|
||||
# :nodoc:
|
||||
# @api private
|
||||
def initialize(name)
|
||||
@pool = Sidekiq.default_configuration.redis_pool
|
||||
@name = name
|
||||
@_size = size
|
||||
end
|
||||
|
||||
# real-time size of the set, will change
|
||||
def size
|
||||
@pool.with { |c| c.zcard(name) }
|
||||
Sidekiq.redis { |c| c.zcard(name) }
|
||||
end
|
||||
|
||||
# Scan through each element of the sorted set, yielding each to the supplied block.
|
||||
|
@ -610,7 +606,7 @@ module Sidekiq
|
|||
return to_enum(:scan, match, count) unless block_given?
|
||||
|
||||
match = "*#{match}*" unless match.include?("*")
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zscan(name, match: match, count: count) do |entry, score|
|
||||
yield SortedEntry.new(self, score, entry)
|
||||
end
|
||||
|
@ -619,7 +615,7 @@ module Sidekiq
|
|||
|
||||
# @return [Boolean] always true
|
||||
def clear
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
conn.unlink(name)
|
||||
end
|
||||
true
|
||||
|
@ -641,7 +637,7 @@ module Sidekiq
|
|||
# @param timestamp [Time] the score for the job
|
||||
# @param job [Hash] the job data
|
||||
def schedule(timestamp, job)
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job))
|
||||
end
|
||||
end
|
||||
|
@ -655,7 +651,7 @@ module Sidekiq
|
|||
loop do
|
||||
range_start = page * page_size + offset_size
|
||||
range_end = range_start + page_size - 1
|
||||
elements = @pool.with { |conn|
|
||||
elements = Sidekiq.redis { |conn|
|
||||
conn.zrange name, range_start, range_end, withscores: true
|
||||
}
|
||||
break if elements.empty?
|
||||
|
@ -682,7 +678,7 @@ module Sidekiq
|
|||
[score, score]
|
||||
end
|
||||
|
||||
elements = @pool.with { |conn|
|
||||
elements = Sidekiq.redis { |conn|
|
||||
conn.zrangebyscore(name, begin_score, end_score, withscores: true)
|
||||
}
|
||||
|
||||
|
@ -700,7 +696,7 @@ module Sidekiq
|
|||
# @param jid [String] the job identifier
|
||||
# @return [SortedEntry] the record or nil
|
||||
def find_job(jid)
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score|
|
||||
job = JSON.parse(entry)
|
||||
matched = job["jid"] == jid
|
||||
|
@ -713,7 +709,7 @@ module Sidekiq
|
|||
# :nodoc:
|
||||
# @api private
|
||||
def delete_by_value(name, value)
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
ret = conn.zrem(name, value)
|
||||
@_size -= 1 if ret
|
||||
ret
|
||||
|
@ -723,7 +719,7 @@ module Sidekiq
|
|||
# :nodoc:
|
||||
# @api private
|
||||
def delete_by_jid(score, jid)
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
elements = conn.zrangebyscore(name, score, score)
|
||||
elements.each do |element|
|
||||
if element.index(jid)
|
||||
|
@ -793,7 +789,7 @@ module Sidekiq
|
|||
# @param message [String] the job data as JSON
|
||||
def kill(message, opts = {})
|
||||
now = Time.now.to_f
|
||||
@pool.with do |conn|
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do |transaction|
|
||||
transaction.zadd(name, now.to_s, message)
|
||||
transaction.zremrangebyscore(name, "-inf", now - Sidekiq::Config::DEFAULTS[:dead_timeout_in_seconds])
|
||||
|
|
|
@ -355,11 +355,21 @@ module Sidekiq
|
|||
|
||||
def client_push(item) # :nodoc:
|
||||
raise ArgumentError, "Job payloads should contain no Symbols: #{item}" if item.any? { |k, v| k.is_a?(::Symbol) }
|
||||
build_client.push(item)
|
||||
|
||||
# allow the user to dynamically re-target jobs to another shard using the "pool" attribute
|
||||
# FooJob.set(pool: SOME_POOL).perform_async
|
||||
old = Thread.current[:sidekiq_redis_pool]
|
||||
pool = item.delete("pool")
|
||||
Thread.current[:sidekiq_redis_pool] = pool if pool
|
||||
begin
|
||||
build_client.push(item)
|
||||
ensure
|
||||
Thread.current[:sidekiq_redis_pool] = old
|
||||
end
|
||||
end
|
||||
|
||||
def build_client # :nodoc:
|
||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.default_configuration.redis_pool
|
||||
pool = Thread.current[:sidekiq_redis_pool] || get_sidekiq_options["pool"] || Sidekiq.default_configuration.redis_pool
|
||||
client_class = get_sidekiq_options["client_class"] || Sidekiq::Client
|
||||
client_class.new(pool: pool)
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue