Scheduled jobs! Bump to 2.0.0.

Performs can now be scheduled at arbitrary points in the future.
This commit is contained in:
Mike Perham 2012-05-25 20:21:42 -07:00
parent 4e9fd939ba
commit 2c4be4cada
26 changed files with 237 additions and 97 deletions

View File

@ -1,3 +1,32 @@
2.0.0
-----------
- **SCHEDULED JOBS**!
You can now use `perform_at` and `perform_in` to schedule jobs
to run at arbitrary points in the future, like so:
```ruby
SomeWorker.perform_in(5.days, 'bob', 13)
SomeWorker.perform_at(5.days.from_now, 'bob', 13)
```
It also works with the delay extensions:
```ruby
UserMailer.delay_for(5.days).send_welcome_email(user.id)
```
The time is approximately when the job will be placed on the queue;
it is not guaranteed to run at precisely at that moment in time.
This functionality is meant for one-off, arbitrary jobs. I still
recommend `whenever` or `clockwork` if you want cron-like,
recurring jobs. See `examples/scheduling.rb`
I want to specially thank @yabawock for his work on sidekiq-scheduler.
His extension for Sidekiq 1.x filled an obvious functional gap that I now think is
useful enough to implement in Sidekiq proper.
1.2.1
-----------

View File

@ -1,4 +1,4 @@
# Sidekiq defers scheduling to other, better suited gems.
# Sidekiq defers scheduling cron-like tasks to other, better suited gems.
# If you want to run a job regularly, here's an example
# of using the 'whenever' gem to push jobs to Sidekiq
# regularly.

View File

@ -9,6 +9,8 @@ require 'sidekiq/extensions/action_mailer'
require 'sidekiq/extensions/active_record'
require 'sidekiq/rails' if defined?(::Rails)
require 'multi_json'
module Sidekiq
DEFAULTS = {

View File

@ -30,7 +30,7 @@ require 'celluloid'
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/manager'
require 'sidekiq/retry'
require 'sidekiq/scheduled'
module Sidekiq
class CLI
@ -65,7 +65,7 @@ module Sidekiq
def run
@manager = Sidekiq::Manager.new(options)
poller = Sidekiq::Retry::Poller.new
poller = Sidekiq::Scheduled::Poller.new
begin
logger.info 'Starting processing, hit Ctrl-C to stop'
@manager.start!

View File

@ -1,5 +1,3 @@
require 'multi_json'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/client/unique_jobs'
@ -50,9 +48,13 @@ module Sidekiq
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
payload = Sidekiq.dump_json(item)
Sidekiq.redis do |conn|
_, pushed = conn.multi do
conn.sadd('queues', queue)
conn.rpush("queue:#{queue}", payload)
if item['at']
pushed = (conn.zadd('schedule', item['at'].to_s, payload) == 1)
else
_, pushed = conn.multi do
conn.sadd('queues', queue)
conn.rpush("queue:#{queue}", payload)
end
end
end
end

View File

@ -3,12 +3,16 @@ require 'sidekiq/extensions/generic_proxy'
module Sidekiq
module Extensions
##
# Adds a 'delay' method to ActionMailer to offload arbitrary email
# Adds 'delay' and 'delay_for' to ActionMailer to offload arbitrary email
# delivery to Sidekiq. Example:
#
# UserMailer.delay.send_welcome_email(new_user)
# UserMailer.delay_for(5.days).send_welcome_email(new_user)
class DelayedMailer
include Sidekiq::Worker
# I think it's reasonable to assume that emails should take less
# than 30 seconds to send.
sidekiq_options :timeout => 30
def perform(yml)
(target, method_name, args) = YAML.load(yml)
@ -20,6 +24,9 @@ module Sidekiq
def delay
Proxy.new(DelayedMailer, self)
end
def delay_for(interval)
Proxy.new(DelayedMailer, self, Time.now + interval)
end
end
end

View File

@ -21,6 +21,9 @@ module Sidekiq
def delay
Proxy.new(DelayedModel, self)
end
def delay_for(interval)
Proxy.new(DelayedModel, self, Time.now + interval)
end
end
end

View File

@ -1,9 +1,10 @@
module Sidekiq
module Extensions
class Proxy < (RUBY_VERSION < '1.9' ? Object : BasicObject)
def initialize(performable, target)
def initialize(performable, target, at=nil)
@performable = performable
@target = target
@at = at
end
def method_missing(name, *args)
@ -13,7 +14,11 @@ module Sidekiq
# to JSON and then deserialized on the other side back into a
# Ruby object.
obj = [@target, name, args]
@performable.perform_async(::YAML.dump(obj))
if @at
@performable.perform_at(@at, ::YAML.dump(obj))
else
@performable.perform_async(::YAML.dump(obj))
end
end
end

View File

@ -1,5 +1,4 @@
require 'celluloid'
require 'multi_json'
require 'sidekiq/util'
require 'sidekiq/processor'

View File

@ -1,5 +1,4 @@
require 'digest'
require 'multi_json'
module Sidekiq
module Middleware

View File

@ -1,5 +1,3 @@
require 'multi_json'
module Sidekiq
module Middleware
module Server

View File

@ -1,12 +1,21 @@
require 'multi_json'
require 'sidekiq/retry'
require 'sidekiq/scheduled'
module Sidekiq
module Middleware
module Server
##
# Automatically retry jobs that fail in Sidekiq.
# Sidekiq's retry support assumes a typical development lifecycle:
# 0. push some code changes with a bug in it
# 1. bug causes message processing to fail, sidekiq's middleware captures
# the message and pushes it onto a retry queue
# 2. sidekiq retries messages in the retry queue multiple times with
# an exponential delay, the message continues to fail
# 3. after a few days, a developer deploys a fix. the message is
# reprocessed successfully.
# 4. if 3 never happens, sidekiq will eventually give up and throw the
# message away.
#
# A message looks like:
#
# { 'class' => 'HardWorker', 'args' => [1, 2, 'foo'] }
@ -24,7 +33,10 @@ module Sidekiq
# to the message and everyone is using Airbrake, right?
class RetryJobs
include Sidekiq::Util
include Sidekiq::Retry
# delayed_job uses the same basic formula
MAX_COUNT = 25
DELAY = proc { |count| (count ** 4) + 15 }
def call(worker, msg, queue)
yield

View File

@ -1,5 +1,3 @@
require 'multi_json'
module Sidekiq
module Middleware
module Server

View File

@ -1,5 +1,4 @@
require 'celluloid'
require 'multi_json'
require 'sidekiq/util'
require 'sidekiq/middleware/server/active_record'

View File

@ -1,59 +0,0 @@
require 'sidekiq'
require 'sidekiq/util'
require 'celluloid'
require 'multi_json'
module Sidekiq
##
# Sidekiq's retry support assumes a typical development lifecycle:
# 0. push some code changes with a bug in it
# 1. bug causes message processing to fail, sidekiq's middleware captures
# the message and pushes it onto a retry queue
# 2. sidekiq retries messages in the retry queue multiple times with
# an exponential delay, the message continues to fail
# 3. after a few days, a developer deploys a fix. the message is
# reprocessed successfully.
# 4. if 3 never happens, sidekiq will eventually give up and throw the
# message away.
module Retry
# delayed_job uses the same basic formula
MAX_COUNT = 25
DELAY = proc { |count| (count ** 4) + 15 }
POLL_INTERVAL = 15
##
# The Poller checks Redis every N seconds for messages in the retry
# set have passed their retry timestamp and should be retried. If so, it
# just pops the message back onto its original queue so the
# workers can pick it up like any other message.
class Poller
include Celluloid
include Sidekiq::Util
def poll
watchdog('retry poller thread died!') do
Sidekiq.redis do |conn|
# A message's "score" in Redis is the time at which it should be retried.
# Just check Redis for the set of messages with a timestamp before now.
messages = nil
now = Time.now.to_f.to_s
(messages, _) = conn.multi do
conn.zrangebyscore('retry', '-inf', now)
conn.zremrangebyscore('retry', '-inf', now)
end
messages.each do |message|
logger.debug { "Retrying #{message}" }
msg = Sidekiq.load_json(message)
conn.rpush("queue:#{msg['queue']}", message)
end
end
after(POLL_INTERVAL) { poll }
end
end
end
end
end

47
lib/sidekiq/scheduled.rb Normal file
View File

@ -0,0 +1,47 @@
require 'sidekiq'
require 'sidekiq/util'
require 'celluloid'
module Sidekiq
module Scheduled
POLL_INTERVAL = 15
##
# The Poller checks Redis every N seconds for messages in the retry or scheduled
# set have passed their timestamp and should be enqueued. If so, it
# just pops the message back onto its original queue so the
# workers can pick it up like any other message.
class Poller
include Celluloid
include Sidekiq::Util
SETS = %w(retry schedule)
def poll
watchdog('scheduling poller thread died!') do
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
(messages, _) = conn.multi do
conn.zrangebyscore(sorted_set, '-inf', now)
conn.zremrangebyscore(sorted_set, '-inf', now)
end
messages.each do |message|
logger.debug { "enqueued #{sorted_set}: #{message}" }
msg = Sidekiq.load_json(message)
conn.rpush("queue:#{msg['queue']}", message)
end
end
end
after(POLL_INTERVAL) { poll }
end
end
end
end
end

View File

@ -1,3 +1,3 @@
module Sidekiq
VERSION = "1.2.1"
VERSION = "2.0.0"
end

View File

@ -1,8 +1,6 @@
require 'sinatra/base'
require 'slim'
require 'sprockets'
require 'multi_json'
module Sidekiq
class SprocketsMiddleware
def initialize(app, options={})
@ -57,8 +55,8 @@ module Sidekiq
Sidekiq.redis { |conn| conn.get('stat:failed') } || 0
end
def retry_count
Sidekiq.redis { |conn| conn.zcard('retry') }
def zcard(name)
Sidekiq.redis { |conn| conn.zcard(name) }
end
def retries(count=50)
@ -68,6 +66,13 @@ module Sidekiq
end
end
def scheduled(count=50)
Sidekiq.redis do |conn|
results = conn.zrange('schedule', 0, count, :withscores => true)
results.each_slice(2).map { |msg, score| [Sidekiq.load_json(msg), Float(score)] }
end
end
def queues
@queues ||= Sidekiq.redis do |conn|
conn.smembers('queues').map do |q|
@ -142,14 +147,29 @@ module Sidekiq
slim :retries
end
get '/scheduled' do
@scheduled = scheduled
slim :scheduled
end
post '/scheduled' do
halt 404 unless params[:score]
halt 404 unless params['delete']
params[:score].each do |score|
s = score.to_f
process_score('schedule', s, :delete)
end
redirect root_path
end
post '/retries' do
halt 404 unless params[:score]
params[:score].each do |score|
s = score.to_f
if params['retry']
process_score(s, :retry)
process_score('retry', s, :retry)
elsif params['delete']
process_score(s, :delete)
process_score('retry', s, :delete)
end
end
redirect root_path
@ -166,12 +186,12 @@ module Sidekiq
redirect root_path
end
def process_score(score, operation)
def process_score(set, score, operation)
case operation
when :retry
Sidekiq.redis do |conn|
results = conn.zrangebyscore('retry', score, score)
conn.zremrangebyscore('retry', score, score)
results = conn.zrangebyscore(set, score, score)
conn.zremrangebyscore(set, score, score)
results.map do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1
@ -180,7 +200,7 @@ module Sidekiq
end
when :delete
Sidekiq.redis do |conn|
conn.zremrangebyscore('retry', score, score)
conn.zremrangebyscore(set, score, score)
end
end
end

View File

@ -33,6 +33,13 @@ module Sidekiq
Sidekiq::Client.push('class' => self, 'args' => args)
end
def perform_in(interval, *args)
int = interval.to_f
ts = (int < 1_000_000_000 ? Time.now.to_f + int : int)
Sidekiq::Client.push('class' => self, 'args' => args, 'at' => ts)
end
alias_method :perform_at, :perform_in
##
# Allows customization for this type of Worker.
# Legal options:

View File

@ -8,7 +8,7 @@ class WorkController < ApplicationController
end
def email
UserMailer.delay.greetings(Time.now)
UserMailer.delay_for(30.seconds).greetings(Time.now)
render :nothing => true
end

View File

@ -29,6 +29,12 @@ class TestExtensions < MiniTest::Unit::TestCase
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
end
it 'allows delayed scheduling of AR class methods' do
assert_equal 0, Sidekiq.redis {|c| c.zcard('schedule') }
MyModel.delay_for(5.days).long_class_method
assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') }
end
class UserMailer < ActionMailer::Base
def greetings(a, b)
raise "Should not be called!"
@ -42,6 +48,12 @@ class TestExtensions < MiniTest::Unit::TestCase
assert_equal ['default'], Sidekiq::Client.registered_queues
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
end
it 'allows delayed scheduling of AM mails' do
assert_equal 0, Sidekiq.redis {|c| c.zcard('schedule') }
UserMailer.delay_for(5.days).greetings(1, 2)
assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') }
end
end
describe 'sidekiq rails extensions configuration' do

View File

@ -1,6 +1,5 @@
require 'helper'
require 'multi_json'
require 'sidekiq/retry'
require 'sidekiq/scheduled'
require 'sidekiq/middleware/server/retry_jobs'
class TestRetry < MiniTest::Unit::TestCase
@ -115,7 +114,7 @@ class TestRetry < MiniTest::Unit::TestCase
@redis.expect :multi, [[fake_msg], 1], []
@redis.expect :rpush, 1, ['queue:someq', fake_msg]
inst = Sidekiq::Retry::Poller.new
inst = Sidekiq::Scheduled::Poller.new
inst.poll
@redis.verify

33
test/test_scheduling.rb Normal file
View File

@ -0,0 +1,33 @@
require 'helper'
require 'sidekiq/scheduled'
class TestScheduling < MiniTest::Unit::TestCase
describe 'middleware' do
before do
@redis = MiniTest::Mock.new
# Ugh, this is terrible.
Sidekiq.instance_variable_set(:@redis, @redis)
def @redis.with; yield self; end
end
class ScheduledWorker
include Sidekiq::Worker
def perform(x)
end
end
it 'schedules a job via interval' do
@redis.expect :zadd, 1, ['schedule', String, String]
assert_equal true, ScheduledWorker.perform_in(600, 'mike')
@redis.verify
end
it 'schedules a job via timestamp' do
@redis.expect :zadd, 1, ['schedule', String, String]
assert_equal true, ScheduledWorker.perform_in(5.days.from_now, 'mike')
@redis.verify
end
end
end

View File

@ -4,7 +4,8 @@
p Processed: #{processed}
p Failed: #{failed}
p Busy Workers: #{workers.size}
p Retries Pending: #{retry_count}
p Scheduled: #{zcard('schedule')}
p Retries Pending: #{zcard('retry')}
p Queue Backlog: #{backlog}
.tabbable

View File

@ -14,6 +14,8 @@ html
a href='#{{root_path}}' Home
li
a href='#{{root_path}}retries' Retries
li
a href='#{{root_path}}scheduled' Scheduled
ul.nav.pull-right
li
a Redis: #{location}

25
web/views/scheduled.slim Normal file
View File

@ -0,0 +1,25 @@
h1 Scheduled Jobs
- if @scheduled.size > 0
form action="#{root_path}scheduled" method="post"
table class="table table-striped table-bordered"
tr
th
input type="checkbox" class="check_all"
th When
th Queue
th Worker
th Args
- @scheduled.each do |(msg, score)|
tr
td
input type='checkbox' name='score[]' value='#{score}'
td== relative_time(Time.at(score))
td
a href="#{root_path}queues/#{msg['queue']}" #{msg['queue']}
td= msg['class']
td= display_args(msg['args'])
input.btn.btn-danger type="submit" name="delete" value="Delete"
- else
p No scheduled jobs found.
a href="#{root_path}" Back