From 451437c6f57e66cc7586ec966e530493927098c7 Mon Sep 17 00:00:00 2001 From: Xavier Noria Date: Wed, 13 Jul 2016 18:21:41 +0200 Subject: [PATCH] adds support for limits in batch processing --- activerecord/CHANGELOG.md | 15 ++ activerecord/lib/active_record/core.rb | 15 +- .../lib/active_record/relation/batches.rb | 112 +++++++++------ activerecord/test/cases/batches_test.rb | 132 ++++++++++++++++-- 4 files changed, 220 insertions(+), 54 deletions(-) diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index c91e6662c9..86d07580e8 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,18 @@ +* The flag `error_on_ignored_order_or_limit` has been deprecated in favor of + the current `error_on_ignored_order`. + + *Xavier Noria* + +* Batch processing methods support `limit`: + + Post.limit(10_000).find_each do |post| + # ... + end + + It also works in `find_in_batches` and `in_batches`. + + *Xavier Noria* + * Using `group` with an attribute that has a custom type will properly cast the hash keys after calling a calculation method like `count`. Fixes #25595. diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index de337b24d6..2c94463acd 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -72,11 +72,20 @@ module ActiveRecord ## # :singleton-method: - # Specifies if an error should be raised on query limit or order being + # Specifies if an error should be raised if the query has an order being # ignored when doing batch queries. Useful in applications where the - # limit or scope being ignored is error-worthy, rather than a warning. + # scope being ignored is error-worthy, rather than a warning. + mattr_accessor :error_on_ignored_order, instance_writer: false + self.error_on_ignored_order = false + mattr_accessor :error_on_ignored_order_or_limit, instance_writer: false - self.error_on_ignored_order_or_limit = false + def self.error_on_ignored_order_or_limit=(value) + ActiveSupport::Deprecation.warn(<<-MSG.squish) + The flag error_on_ignored_order_or_limit is deprecated. Limits are + now supported. Please use error_on_ignored_order= instead. + MSG + self.error_on_ignored_order = value + end ## # :singleton-method: diff --git a/activerecord/lib/active_record/relation/batches.rb b/activerecord/lib/active_record/relation/batches.rb index 3639625722..46bb0e04ee 100644 --- a/activerecord/lib/active_record/relation/batches.rb +++ b/activerecord/lib/active_record/relation/batches.rb @@ -1,8 +1,8 @@ -require "active_record/relation/batches/batch_enumerator" +require 'active_record/relation/batches/batch_enumerator' module ActiveRecord module Batches - ORDER_OR_LIMIT_IGNORED_MESSAGE = "Scoped order and limit are ignored, it's forced to be batch order and batch size." + ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order." # Looping through a collection of records from the database # (using the Scoping::Named::ClassMethods.all method, for example) @@ -34,15 +34,19 @@ module ActiveRecord # * :start - Specifies the primary key value to start from, inclusive of the value. # * :finish - Specifies the primary key value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when - # the order and limit have to be ignored due to batching. + # the order has to be ignored due to batching. # - # This is especially useful if you want multiple workers dealing with - # the same processing queue. You can make worker 1 handle all the records - # between id 0 and 10,000 and worker 2 handle from 10,000 and beyond - # (by setting the +:start+ and +:finish+ option on each worker). + # Limits are honored, and if present there is no requirement for the batch + # size, it can be less than, equal, or greater than the limit. # - # # Let's process for a batch of 2000 records, skipping the first 2000 rows - # Person.find_each(start: 2000, batch_size: 2000) do |person| + # The options +start+ and +finish+ are especially useful if you want + # multiple workers dealing with the same processing queue. You can make + # worker 1 handle all the records between id 1 and 9999 and worker 2 + # handle from 10000 and beyond by setting the +:start+ and +:finish+ + # option on each worker. + # + # # Let's process from record 10_000 on. + # Person.find_each(start: 10_000) do |person| # person.party_all_night! # end # @@ -51,8 +55,8 @@ module ActiveRecord # work. This also means that this method only works when the primary key is # orderable (e.g. an integer or string). # - # NOTE: You can't set the limit either, that's used to control - # the batch sizes. + # NOTE: By its nature, batch processing is subject to race conditions if + # other processes are modifying the database. def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil) if block_given? find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore) do |records| @@ -89,15 +93,19 @@ module ActiveRecord # * :start - Specifies the primary key value to start from, inclusive of the value. # * :finish - Specifies the primary key value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when - # the order and limit have to be ignored due to batching. + # the order has to be ignored due to batching. # - # This is especially useful if you want multiple workers dealing with - # the same processing queue. You can make worker 1 handle all the records - # between id 0 and 10,000 and worker 2 handle from 10,000 and beyond - # (by setting the +:start+ and +:finish+ option on each worker). + # Limits are honored, and if present there is no requirement for the batch + # size, it can be less than, equal, or greater than the limit. # - # # Let's process the next 2000 records - # Person.find_in_batches(start: 2000, batch_size: 2000) do |group| + # The options +start+ and +finish+ are especially useful if you want + # multiple workers dealing with the same processing queue. You can make + # worker 1 handle all the records between id 1 and 9999 and worker 2 + # handle from 10000 and beyond by setting the +:start+ and +:finish+ + # option on each worker. + # + # # Let's process from record 10_000 on. + # Person.find_in_batches(start: 10_000) do |group| # group.each { |person| person.party_all_night! } # end # @@ -106,8 +114,8 @@ module ActiveRecord # work. This also means that this method only works when the primary key is # orderable (e.g. an integer or string). # - # NOTE: You can't set the limit either, that's used to control - # the batch sizes. + # NOTE: By its nature, batch processing is subject to race conditions if + # other processes are modifying the database. def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil) relation = self unless block_given? @@ -149,17 +157,19 @@ module ActiveRecord # * :start - Specifies the primary key value to start from, inclusive of the value. # * :finish - Specifies the primary key value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when - # the order and limit have to be ignored due to batching. + # the order has to be ignored due to batching. # - # This is especially useful if you want to work with the - # ActiveRecord::Relation object instead of the array of records, or if - # you want multiple workers dealing with the same processing queue. You can - # make worker 1 handle all the records between id 0 and 10,000 and worker 2 - # handle from 10,000 and beyond (by setting the +:start+ and +:finish+ - # option on each worker). + # Limits are honored, and if present there is no requirement for the batch + # size, it can be less than, equal, or greater than the limit. # - # # Let's process the next 2000 records - # Person.in_batches(of: 2000, start: 2000).update_all(awesome: true) + # The options +start+ and +finish+ are especially useful if you want + # multiple workers dealing with the same processing queue. You can make + # worker 1 handle all the records between id 1 and 9999 and worker 2 + # handle from 10000 and beyond by setting the +:start+ and +:finish+ + # option on each worker. + # + # # Let's process from record 10_000 on. + # Person.in_batches(start: 10_000).update_all(awesome: true) # # An example of calling where query method on the relation: # @@ -179,19 +189,26 @@ module ActiveRecord # consistent. Therefore the primary key must be orderable, e.g an integer # or a string. # - # NOTE: You can't set the limit either, that's used to control the batch - # sizes. + # NOTE: By its nature, batch processing is subject to race conditions if + # other processes are modifying the database. def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil) relation = self unless block_given? return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self) end - if arel.orders.present? || arel.taken.present? - act_on_order_or_limit_ignored(error_on_ignore) + if arel.orders.present? + act_on_ignored_order(error_on_ignore) end - relation = relation.reorder(batch_order).limit(of) + batch_limit = of + if limit_value + remaining = limit_value + batch_limit = remaining if remaining < batch_limit + relation = relation.limit(nil) # new relation without the limit + end + + relation = relation.reorder(batch_order).limit(batch_limit) relation = apply_limits(relation, start, finish) batch_relation = relation @@ -199,11 +216,11 @@ module ActiveRecord if load records = batch_relation.records ids = records.map(&:id) - yielded_relation = self.where(primary_key => ids) + yielded_relation = where(primary_key => ids) yielded_relation.load_records(records) else ids = batch_relation.pluck(primary_key) - yielded_relation = self.where(primary_key => ids) + yielded_relation = where(primary_key => ids) end break if ids.empty? @@ -213,7 +230,20 @@ module ActiveRecord yield yielded_relation - break if ids.length < of + break if ids.length < batch_limit + + if limit_value + remaining -= ids.length + + if remaining == 0 + # Saves a useless iteration when the limit is a multiple of the + # batch size. + break + elsif remaining < batch_limit + relation = relation.limit(remaining) + end + end + batch_relation = relation.where(arel_attribute(primary_key).gt(primary_key_offset)) end end @@ -230,13 +260,13 @@ module ActiveRecord "#{quoted_table_name}.#{quoted_primary_key} ASC" end - def act_on_order_or_limit_ignored(error_on_ignore) - raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order_or_limit : error_on_ignore) + def act_on_ignored_order(error_on_ignore) + raise_error = (error_on_ignore.nil? ? self.klass.error_on_ignored_order : error_on_ignore) if raise_error - raise ArgumentError.new(ORDER_OR_LIMIT_IGNORED_MESSAGE) + raise ArgumentError.new(ORDER_IGNORE_MESSAGE) elsif logger - logger.warn(ORDER_OR_LIMIT_IGNORED_MESSAGE) + logger.warn(ORDER_IGNORE_MESSAGE) end end end diff --git a/activerecord/test/cases/batches_test.rb b/activerecord/test/cases/batches_test.rb index db71840658..2c6011489e 100644 --- a/activerecord/test/cases/batches_test.rb +++ b/activerecord/test/cases/batches_test.rb @@ -68,10 +68,26 @@ class EachTest < ActiveRecord::TestCase end end - def test_warn_if_limit_scope_is_set - assert_called(ActiveRecord::Base.logger, :warn) do - Post.limit(1).find_each { |post| post } + test 'find_each should honor limit if passed a block' do + limit = @total - 1 + total = 0 + + Post.limit(limit).find_each do |post| + total += 1 end + + assert_equal limit, total + end + + test 'find_each should honor limit if no block is passed' do + limit = @total - 1 + total = 0 + + Post.limit(limit).find_each.each do |post| + total += 1 + end + + assert_equal limit, total end def test_warn_if_order_scope_is_set @@ -84,7 +100,7 @@ class EachTest < ActiveRecord::TestCase previous_logger = ActiveRecord::Base.logger ActiveRecord::Base.logger = nil assert_nothing_raised do - Post.limit(1).find_each { |post| post } + Post.order('comments_count DESC').find_each { |post| post } end ensure ActiveRecord::Base.logger = previous_logger @@ -172,26 +188,26 @@ class EachTest < ActiveRecord::TestCase def test_find_in_batches_should_not_error_if_config_overridden # Set the config option which will be overridden - prev = ActiveRecord::Base.error_on_ignored_order_or_limit - ActiveRecord::Base.error_on_ignored_order_or_limit = true + prev = ActiveRecord::Base.error_on_ignored_order + ActiveRecord::Base.error_on_ignored_order = true assert_nothing_raised do PostWithDefaultScope.find_in_batches(error_on_ignore: false){} end ensure # Set back to default - ActiveRecord::Base.error_on_ignored_order_or_limit = prev + ActiveRecord::Base.error_on_ignored_order = prev end def test_find_in_batches_should_error_on_config_specified_to_error # Set the config option - prev = ActiveRecord::Base.error_on_ignored_order_or_limit - ActiveRecord::Base.error_on_ignored_order_or_limit = true + prev = ActiveRecord::Base.error_on_ignored_order + ActiveRecord::Base.error_on_ignored_order = true assert_raise(ArgumentError) do PostWithDefaultScope.find_in_batches(){} end ensure # Set back to default - ActiveRecord::Base.error_on_ignored_order_or_limit = prev + ActiveRecord::Base.error_on_ignored_order = prev end def test_find_in_batches_should_not_error_by_default @@ -249,6 +265,28 @@ class EachTest < ActiveRecord::TestCase end end + test 'find_in_batches should honor limit if passed a block' do + limit = @total - 1 + total = 0 + + Post.limit(limit).find_in_batches do |batch| + total += batch.size + end + + assert_equal limit, total + end + + test 'find_in_batches should honor limit if no block is passed' do + limit = @total - 1 + total = 0 + + Post.limit(limit).find_in_batches.each do |batch| + total += batch.size + end + + assert_equal limit, total + end + def test_in_batches_should_not_execute_any_query assert_no_queries do assert_kind_of ActiveRecord::Batches::BatchEnumerator, Post.in_batches(of: 2) @@ -486,4 +524,78 @@ class EachTest < ActiveRecord::TestCase assert_equal 1, Post.find_in_batches(:batch_size => 10_000).size end end + + [true, false].each do |load| + test "in_batches should return limit records when limit is less than batch size and load is #{load}" do + limit = 3 + batch_size = 5 + total = 0 + + Post.limit(limit).in_batches(of: batch_size, load: load) do |batch| + total += batch.count + end + + assert_equal limit, total + end + + test "in_batches should return limit records when limit is greater than batch size and load is #{load}" do + limit = 5 + batch_size = 3 + total = 0 + + Post.limit(limit).in_batches(of: batch_size, load: load) do |batch| + total += batch.count + end + + assert_equal limit, total + end + + test "in_batches should return limit records when limit is a multiple of the batch size and load is #{load}" do + limit = 6 + batch_size = 3 + total = 0 + + Post.limit(limit).in_batches(of: batch_size, load: load) do |batch| + total += batch.count + end + + assert_equal limit, total + end + + test "in_batches should return no records if the limit is 0 and load is #{load}" do + limit = 0 + batch_size = 1 + total = 0 + + Post.limit(limit).in_batches(of: batch_size, load: load) do |batch| + total += batch.count + end + + assert_equal limit, total + end + + test "in_batches should return all if the limit is greater than the number of records when load is #{load}" do + limit = @total + 1 + batch_size = 1 + total = 0 + + Post.limit(limit).in_batches(of: batch_size, load: load) do |batch| + total += batch.count + end + + assert_equal @total, total + end + end + + test 'error_on_ignored_order_or_limit= is deprecated' do + begin + prev = ActiveRecord::Base.error_on_ignored_order + assert_deprecated 'Please use error_on_ignored_order= instead.' do + ActiveRecord::Base.error_on_ignored_order_or_limit = true + end + assert ActiveRecord::Base.error_on_ignored_order + ensure + ActiveRecord::Base.error_on_ignored_order = prev + end + end end