Introduce serializers to ActiveJob

This commit is contained in:
Evgenii Pecherkin 2017-10-17 16:05:05 +04:00 committed by Rafael Mendonça França
parent 2e87ea6d70
commit e360ac1231
19 changed files with 602 additions and 136 deletions

View File

@ -52,8 +52,21 @@ MyJob.set(wait: 1.week).perform_later(record) # Enqueue a job to be performed 1
That's it!
## Supported types for arguments
## GlobalID support
ActiveJob supports the following types of arguments by default:
- Standard types (`NilClass`, `String`, `Integer`, `Fixnum`, `Bignum`, `Float`, `BigDecimal`, `TrueClass`, `FalseClass`)
- `Symbol` (`:foo`, `:bar`, ...)
- `ActiveSupport::Duration` (`1.day`, `2.weeks`, ...)
- Classes constants (`ActiveRecord::Base`, `MySpecialService`, ...)
- Struct instances (`Struct.new('Rectangle', :width, :height).new(12, 20)`, ...)
- `Hash`. Keys should be of `String` or `Symbol` type
- `ActiveSupport::HashWithIndifferentAccess`
- `Array`
### GlobalID support
Active Job supports [GlobalID serialization](https://github.com/rails/globalid/) for parameters. This makes it possible
to pass live Active Record objects to your job instead of class/id pairs, which
@ -81,6 +94,53 @@ end
This works with any class that mixes in GlobalID::Identification, which
by default has been mixed into Active Record classes.
### Serializers
You can extend list of supported types for arguments. You just need to define your own serializer.
```ruby
class MySpecialSerializer
class << self
# Check if this object should be serialized using this serializer
def serialize?(argument)
object.is_a? MySpecialValueObject
end
# Convert an object to a simpler representative using supported object types.
# The recommended representative is a Hash with a specific key. Keys can be of basic types only
def serialize(object)
{
key => ActiveJob::Serializers.serialize(object.value)
'another_attribute' => ActiveJob::Serializers.serialize(object.another_attribute)
}
end
# Check if this serialized value be deserialized using this serializer
def deserialize?(argument)
object.is_a?(Hash) && object.keys == [key, 'another_attribute']
end
# Convert serialized value into a proper object
def deserialize(object)
value = ActiveJob::Serializers.deserialize(object[key])
another_attribute = ActiveJob::Serializers.deserialize(object['another_attribute'])
MySpecialValueObject.new value, another_attribute
end
# Define this method if you are using a hash as a representative.
# This key will be added to a list of restricted keys for hashes. Use basic types only
def key
"_aj_custom_dummy_value_object"
end
end
end
```
And now you just need to add this serializer to a list:
```ruby
ActiveJob::Base.add_serializers(MySpecialSerializer)
```
## Supported queueing systems

View File

@ -33,6 +33,7 @@ module ActiveJob
autoload :Base
autoload :QueueAdapters
autoload :Serializers
autoload :ConfiguredJob
autoload :TestCase
autoload :TestHelper

View File

@ -3,24 +3,6 @@
require "active_support/core_ext/hash"
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
# Wraps the original exception raised as +cause+.
class DeserializationError < StandardError
def initialize #:nodoc:
super("Error while trying to deserialize arguments: #{$!.message}")
set_backtrace $!.backtrace
end
end
# Raised when an unsupported argument type is set as a job argument. We
# currently support NilClass, Integer, Fixnum, Float, String, TrueClass, FalseClass,
# Bignum, BigDecimal, and objects that can be represented as GlobalIDs (ex: Active Record).
# Raised if you set the key for a Hash something else than a string or
# a symbol. Also raised when trying to serialize an object which can't be
# identified with a Global ID - such as an unpersisted Active Record model.
class SerializationError < ArgumentError; end
module Arguments
extend self
# :nodoc:
@ -31,126 +13,16 @@ module ActiveJob
# as-is. Arrays/Hashes are serialized element by element.
# All other types are serialized using GlobalID.
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
ActiveJob::Serializers.serialize(arguments)
end
# Deserializes a set of arguments. Whitelisted types are returned
# as-is. Arrays/Hashes are deserialized element by element.
# All other types are deserialized using GlobalID.
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
ActiveJob::Serializers.deserialize(arguments)
rescue
raise DeserializationError
end
private
# :nodoc:
GLOBALID_KEY = "_aj_globalid".freeze
# :nodoc:
SYMBOL_KEYS_KEY = "_aj_symbol_keys".freeze
# :nodoc:
WITH_INDIFFERENT_ACCESS_KEY = "_aj_hash_with_indifferent_access".freeze
private_constant :GLOBALID_KEY, :SYMBOL_KEYS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
def serialize_argument(argument)
case argument
when *TYPE_WHITELIST
argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
result = serialize_hash(argument)
result[WITH_INDIFFERENT_ACCESS_KEY] = serialize_argument(true)
result
when Hash
symbol_keys = argument.each_key.grep(Symbol).map(&:to_s)
result = serialize_hash(argument)
result[SYMBOL_KEYS_KEY] = symbol_keys
result
else
raise SerializationError.new("Unsupported argument type: #{argument.class.name}")
end
end
def deserialize_argument(argument)
case argument
when String
GlobalID::Locator.locate(argument) || argument
when *TYPE_WHITELIST
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
else
deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
end
def serialized_global_id?(hash)
hash.size == 1 && hash.include?(GLOBALID_KEY)
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
def serialize_hash(argument)
argument.each_with_object({}) do |(key, value), hash|
hash[serialize_hash_key(key)] = serialize_argument(value)
end
end
def deserialize_hash(serialized_hash)
result = serialized_hash.transform_values { |v| deserialize_argument(v) }
if result.delete(WITH_INDIFFERENT_ACCESS_KEY)
result = result.with_indifferent_access
elsif symbol_keys = result.delete(SYMBOL_KEYS_KEY)
result = transform_symbol_keys(result, symbol_keys)
end
result
end
# :nodoc:
RESERVED_KEYS = [
GLOBALID_KEY, GLOBALID_KEY.to_sym,
SYMBOL_KEYS_KEY, SYMBOL_KEYS_KEY.to_sym,
WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
]
private_constant :RESERVED_KEYS
def serialize_hash_key(key)
case key
when *RESERVED_KEYS
raise SerializationError.new("Can't serialize a Hash with reserved key #{key.inspect}")
when String, Symbol
key.to_s
else
raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}")
end
end
def transform_symbol_keys(hash, symbol_keys)
hash.transform_keys do |key|
if symbol_keys.include?(key)
key.to_sym
else
key
end
end
end
def convert_to_global_id_hash(argument)
{ GLOBALID_KEY => argument.to_global_id.to_s }
rescue URI::GID::MissingModelIdError
raise SerializationError, "Unable to serialize #{argument.class} " \
"without an id. (Maybe you forgot to call save?)"
end
end
end

View File

@ -1,6 +1,7 @@
# frozen_string_literal: true
require "active_job/core"
require "active_job/serializers"
require "active_job/queue_adapter"
require "active_job/queue_name"
require "active_job/queue_priority"
@ -59,6 +60,7 @@ module ActiveJob #:nodoc:
# * SerializationError - Error class for serialization errors.
class Base
include Core
include Serializers
include QueueAdapter
include QueueName
include QueuePriority

View File

@ -0,0 +1,109 @@
# frozen_string_literal: true
module ActiveJob
# Raised when an exception is raised during job arguments deserialization.
#
# Wraps the original exception raised as +cause+.
class DeserializationError < StandardError
def initialize #:nodoc:
super("Error while trying to deserialize arguments: #{$!.message}")
set_backtrace $!.backtrace
end
end
# Raised when an unsupported argument type is set as a job argument. We
# currently support NilClass, Integer, Fixnum, Float, String, TrueClass, FalseClass,
# Bignum, BigDecimal, and objects that can be represented as GlobalIDs (ex: Active Record).
# Raised if you set the key for a Hash something else than a string or
# a symbol. Also raised when trying to serialize an object which can't be
# identified with a Global ID - such as an unpersisted Active Record model.
class SerializationError < ArgumentError; end
# The <tt>ActiveJob::Serializers</tt> module is used to store a list of known serializers
# and to add new ones. It also has helpers to serialize/deserialize objects
module Serializers
extend ActiveSupport::Autoload
extend ActiveSupport::Concern
autoload :ArraySerializer
autoload :BaseSerializer
autoload :ClassSerializer
autoload :DurationSerializer
autoload :GlobalIDSerializer
autoload :HashWithIndifferentAccessSerializer
autoload :HashSerializer
autoload :ObjectSerializer
autoload :StandardTypeSerializer
autoload :StructSerializer
autoload :SymbolSerializer
included do
class_attribute :_additional_serializers, instance_accessor: false, instance_predicate: false
self._additional_serializers = []
end
# Includes the method to list known serializers and to add new ones
module ClassMethods
# Returns list of known serializers
def serializers
self._additional_serializers + SERIALIZERS
end
# Adds a new serializer to a list of known serializers
def add_serializers(*serializers)
check_duplicate_serializer_keys!(serializers)
@_additional_serializers = serializers + @_additional_serializers
end
# Returns a list of reserved keys, which cannot be used as keys for a hash
def reserved_serializers_keys
serializers.select { |s| s.respond_to?(:key) }.map(&:key)
end
private
def check_duplicate_serializer_keys!(serializers)
keys_to_add = serializers.select { |s| s.respond_to?(:key) }.map(&:key)
duplicate_keys = reserved_keys & keys_to_add
raise ArgumentError.new("Can't add serializers because of keys duplication: #{duplicate_keys}") if duplicate_keys.any?
end
end
# :nodoc:
SERIALIZERS = [
::ActiveJob::Serializers::GlobalIDSerializer,
::ActiveJob::Serializers::DurationSerializer,
::ActiveJob::Serializers::StructSerializer,
::ActiveJob::Serializers::SymbolSerializer,
::ActiveJob::Serializers::ClassSerializer,
::ActiveJob::Serializers::StandardTypeSerializer,
::ActiveJob::Serializers::HashWithIndifferentAccessSerializer,
::ActiveJob::Serializers::HashSerializer,
::ActiveJob::Serializers::ArraySerializer
].freeze
private_constant :SERIALIZERS
class << self
# Returns serialized representative of the passed object.
# Will look up through all known serializers.
# Raises `SerializationError` if it can't find a proper serializer.
def serialize(argument)
serializer = ::ActiveJob::Base.serializers.detect { |s| s.serialize?(argument) }
raise SerializationError.new("Unsupported argument type: #{argument.class.name}") unless serializer
serializer.serialize(argument)
end
# Returns deserialized object.
# Will look up through all known serializers.
# If no serializers found will raise `ArgumentError`
def deserialize(argument)
serializer = ::ActiveJob::Base.serializers.detect { |s| s.deserialize?(argument) }
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}" unless serializer
serializer.deserialize(argument)
end
end
end
end

View File

@ -0,0 +1,26 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `Array`
class ArraySerializer < BaseSerializer
class << self
alias_method :deserialize?, :serialize?
def serialize(array)
array.map { |arg| ::ActiveJob::Serializers.serialize(arg) }
end
def deserialize(array)
array.map { |arg| ::ActiveJob::Serializers.deserialize(arg) }
end
private
def klass
::Array
end
end
end
end
end

View File

@ -0,0 +1,13 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
class BaseSerializer
class << self
def serialize?(argument)
argument.is_a?(klass)
end
end
end
end
end

View File

@ -0,0 +1,24 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `Class` (`ActiveRecord::Base`, `MySpecialService`, ...)
class ClassSerializer < ObjectSerializer
class << self
def serialize(argument_klass)
{ key => "::#{argument_klass.name}" }
end
def key
"_aj_class"
end
private
def klass
::Class
end
end
end
end
end

View File

@ -0,0 +1,42 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `ActiveSupport::Duration` (`1.day`, `2.weeks`, ...)
class DurationSerializer < ObjectSerializer
class << self
def serialize(duration)
{
key => duration.value,
parts_key => ::ActiveJob::Serializers.serialize(duration.parts)
}
end
def deserialize(hash)
value = hash[key]
parts = ::ActiveJob::Serializers.deserialize(hash[parts_key])
klass.new(value, parts)
end
def key
"_aj_activesupport_duration"
end
private
def klass
::ActiveSupport::Duration
end
def keys
super.push parts_key
end
def parts_key
"parts"
end
end
end
end
end

View File

@ -0,0 +1,32 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize objects which mixes `GlobalID::Identification`,
# including `ActiveRecord::Base` models
class GlobalIDSerializer < ObjectSerializer
class << self
def serialize(object)
{ key => object.to_global_id.to_s }
rescue URI::GID::MissingModelIdError
raise SerializationError, "Unable to serialize #{object.class} " \
"without an id. (Maybe you forgot to call save?)"
end
def deserialize(hash)
GlobalID::Locator.locate(hash[key])
end
def key
"_aj_globalid"
end
private
def klass
::GlobalID::Identification
end
end
end
end
end

View File

@ -0,0 +1,62 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `Hash` (`{key: field, ...}`)
# Only `String` or `Symbol` can be used as a key. Values will be serialized by known serializers
class HashSerializer < BaseSerializer
class << self
def serialize(hash)
symbol_keys = hash.each_key.grep(Symbol).map(&:to_s)
result = serialize_hash(hash)
result[key] = symbol_keys
result
end
def deserialize?(argument)
argument.is_a?(Hash) && argument[key]
end
def deserialize(hash)
result = hash.transform_values { |v| ::ActiveJob::Serializers::deserialize(v) }
symbol_keys = result.delete(key)
transform_symbol_keys(result, symbol_keys)
end
def key
"_aj_symbol_keys"
end
private
def serialize_hash(hash)
hash.each_with_object({}) do |(key, value), result|
result[serialize_hash_key(key)] = ::ActiveJob::Serializers.serialize(value)
end
end
def serialize_hash_key(key)
raise SerializationError.new("Only string and symbol hash keys may be serialized as job arguments, but #{key.inspect} is a #{key.class}") unless [String, Symbol].include?(key.class)
raise SerializationError.new("Can't serialize a Hash with reserved key #{key.inspect}") if ActiveJob::Base.reserved_serializers_keys.include?(key.to_s)
key.to_s
end
def transform_symbol_keys(hash, symbol_keys)
hash.transform_keys do |key|
if symbol_keys.include?(key)
key.to_sym
else
key
end
end
end
def klass
::Hash
end
end
end
end
end

View File

@ -0,0 +1,37 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `ActiveSupport::HashWithIndifferentAccess`
# Values will be serialized by known serializers
class HashWithIndifferentAccessSerializer < HashSerializer
class << self
def serialize(hash)
result = serialize_hash(hash)
result[key] = ::ActiveJob::Serializers.serialize(true)
result
end
def deserialize?(argument)
argument.is_a?(Hash) && argument[key]
end
def deserialize(hash)
result = hash.transform_values { |v| ::ActiveJob::Serializers.deserialize(v) }
result.delete(key)
result.with_indifferent_access
end
def key
"_aj_hash_with_indifferent_access"
end
private
def klass
::ActiveSupport::HashWithIndifferentAccess
end
end
end
end
end

View File

@ -0,0 +1,27 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
class ObjectSerializer < BaseSerializer
class << self
def serialize(object)
{ key => object.class.name }
end
def deserialize?(argument)
argument.respond_to?(:keys) && argument.keys == keys
end
def deserialize(hash)
hash[key].constantize
end
private
def keys
[key]
end
end
end
end
end

View File

@ -0,0 +1,26 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize standard types
# (`NilClass`, `String`, `Integer`, `Fixnum`, `Bignum`, `Float`, `BigDecimal`, `TrueClass`, `FalseClass`)
class StandardTypeSerializer < BaseSerializer
class << self
def serialize?(argument)
::ActiveJob::Arguments::TYPE_WHITELIST.include? argument.class
end
def serialize(argument)
argument
end
alias_method :deserialize?, :serialize?
def deserialize(argument)
object = GlobalID::Locator.locate(argument) if argument.is_a? String
object || argument
end
end
end
end
end

View File

@ -0,0 +1,38 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize struct instances
# (`Struct.new('Rectangle', :width, :height).new(12, 20)`)
class StructSerializer < ObjectSerializer
class << self
def serialize(object)
super.merge values_key => ::ActiveJob::Serializers.serialize(object.values)
end
def deserialize(hash)
values = ::ActiveJob::Serializers.deserialize(hash[values_key])
super.new(*values)
end
def key
"_aj_struct"
end
private
def klass
::Struct
end
def keys
super.push values_key
end
def values_key
"values"
end
end
end
end
end

View File

@ -0,0 +1,28 @@
# frozen_string_literal: true
module ActiveJob
module Serializers
# Provides methods to serialize and deserialize `Symbol` (`:foo`, `:bar`, ...)
class SymbolSerializer < ObjectSerializer
class << self
def serialize(symbol)
{ key => symbol.to_s }
end
def deserialize(hash)
hash[key].to_sym
end
def key
"_aj_symbol"
end
private
def klass
::Symbol
end
end
end
end
end

View File

@ -30,7 +30,7 @@ module Rails # :nodoc:
private
def application_job_file_name
@application_job_file_name ||= if mountable_engine?
"app/jobs/#{namespaced_path}/application_job.rb"
"app/jobs/#{namespaced_path}/application_job.rb"
else
"app/jobs/application_job.rb"
end

View File

@ -4,6 +4,7 @@ require "helper"
require "active_job/arguments"
require "models/person"
require "active_support/core_ext/hash/indifferent_access"
require "active_support/duration"
require "jobs/kwargs_job"
class ArgumentSerializationTest < ActiveSupport::TestCase
@ -12,7 +13,8 @@ class ArgumentSerializationTest < ActiveSupport::TestCase
end
[ nil, 1, 1.0, 1_000_000_000_000_000_000_000,
"a", true, false, BigDecimal(5),
"a", true, false, BigDecimal.new(5),
:a, self, 1.day,
[ 1, "a" ],
{ "a" => 1 }
].each do |arg|
@ -21,7 +23,7 @@ class ArgumentSerializationTest < ActiveSupport::TestCase
end
end
[ :a, Object.new, self, Person.find("5").to_gid ].each do |arg|
[ Object.new, Person.find("5").to_gid ].each do |arg|
test "does not serialize #{arg.class}" do
assert_raises ActiveJob::SerializationError do
ActiveJob::Arguments.serialize [ arg ]
@ -33,6 +35,10 @@ class ArgumentSerializationTest < ActiveSupport::TestCase
end
end
test "serializes Struct" do
assert_arguments_unchanged Struct.new("Rectangle", :width, :height).new(10, 15)
end
test "should convert records to Global IDs" do
assert_arguments_roundtrip [@person]
end

View File

@ -339,8 +339,21 @@ UserMailer.welcome(@user).deliver_later # Email will be localized to Esperanto.
```
GlobalID
--------
Supported types for arguments
----------------------------
ActiveJob supports the following types of arguments by default:
- Basic types (`NilClass`, `String`, `Integer`, `Fixnum`, `Bignum`, `Float`, `BigDecimal`, `TrueClass`, `FalseClass`)
- `Symbol` (`:foo`, `:bar`, ...)
- `ActiveSupport::Duration` (`1.day`, `2.weeks`, ...)
- Classes constants (`ActiveRecord::Base`, `MySpecialService`, ...)
- Struct instances (`Struct.new('Rectangle', :width, :height).new(12, 20)`, ...)
- `Hash`. Keys should be of `String` or `Symbol` type
- `ActiveSupport::HashWithIndifferentAccess`
- `Array`
### GlobalID
Active Job supports GlobalID for parameters. This makes it possible to pass live
Active Record objects to your job instead of class/id pairs, which you then have
@ -368,6 +381,54 @@ end
This works with any class that mixes in `GlobalID::Identification`, which
by default has been mixed into Active Record classes.
### Serializers
You can extend list of supported types for arguments. You just need to define your own serializer.
```ruby
class MySpecialSerializer
class << self
# Check if this object should be serialized using this serializer
def serialize?(argument)
argument.is_a? MySpecialValueObject
end
# Convert an object to a simpler representative using supported object types.
# The recommended representative is a Hash with a specific key. Keys can be of basic types only
def serialize(object)
{
key => ActiveJob::Serializers.serialize(object.value)
'another_attribute' => ActiveJob::Serializers.serialize(object.another_attribute)
}
end
# Check if this serialized value be deserialized using this serializer
def deserialize?(argument)
argument.is_a?(Hash) && argument.keys == [key, 'another_attribute']
end
# Convert serialized value into a proper object
def deserialize(object)
value = ActiveJob::Serializers.deserialize(object[key])
another_attribute = ActiveJob::Serializers.deserialize(object['another_attribute'])
MySpecialValueObject.new value, another_attribute
end
# Define this method if you are using a hash as a representative.
# This key will be added to a list of restricted keys for hashes. Use basic types only
def key
"_aj_custom_dummy_value_object"
end
end
end
```
And now you just need to add this serializer to a list:
```ruby
ActiveJob::Base.add_serializers(MySpecialSerializer)
```
Exceptions
----------