Thread pools gracefully handle thread creation errors.
This commit is contained in:
parent
dcac23aef1
commit
ad621772c5
|
@ -0,0 +1,26 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
$: << File.expand_path('../../lib', __FILE__)
|
||||
|
||||
require 'benchmark'
|
||||
require 'concurrent/executors'
|
||||
|
||||
COUNT = 100_000
|
||||
|
||||
executor = Concurrent::CachedThreadPool.new
|
||||
latch = Concurrent::CountDownLatch.new
|
||||
|
||||
COUNT.times { executor.post{ nil } }
|
||||
|
||||
#COUNT.times do |i|
|
||||
# executor.post{ nil }
|
||||
# sleep(0.01) if i % 1000 == 0
|
||||
#end
|
||||
|
||||
executor.post{ latch.count_down }
|
||||
latch.wait
|
||||
|
||||
puts "Max length: #{executor.max_length}" if executor.respond_to?(:max_length)
|
||||
puts "Largest length: #{executor.largest_length}" if executor.respond_to?(:largest_length)
|
||||
puts "Scheduled task count: #{executor.scheduled_task_count}" if executor.respond_to?(:scheduled_task_count)
|
||||
puts "Completed task count: #{executor.completed_task_count}" if executor.respond_to?(:completed_task_count)
|
|
@ -141,9 +141,10 @@ if Concurrent.on_jruby?
|
|||
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
|
||||
deprecated ' :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
|
||||
|
||||
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
|
||||
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
|
||||
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
|
||||
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
|
||||
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
|
||||
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
|
||||
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
|
||||
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
|
||||
|
||||
if @max_queue == 0
|
||||
|
|
|
@ -12,7 +12,7 @@ module Concurrent
|
|||
class RubyThreadPoolExecutor < RubyExecutorService
|
||||
|
||||
# Default maximum number of threads that will be created in the pool.
|
||||
DEFAULT_MAX_POOL_SIZE = 2**13 # 8192
|
||||
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
|
||||
|
||||
# Default minimum number of threads that will be retained in the pool.
|
||||
DEFAULT_MIN_POOL_SIZE = 0
|
||||
|
@ -139,9 +139,10 @@ module Concurrent
|
|||
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
|
||||
deprecated ':overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
|
||||
|
||||
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
|
||||
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
|
||||
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
|
||||
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
|
||||
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
|
||||
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
|
||||
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
|
||||
|
||||
self.auto_terminate = opts.fetch(:auto_terminate, true)
|
||||
|
||||
|
@ -216,6 +217,9 @@ module Concurrent
|
|||
else
|
||||
false
|
||||
end
|
||||
rescue ThreadError
|
||||
# Raised when the operating system refuses to create the new thread
|
||||
return false
|
||||
end
|
||||
|
||||
# tries to enqueue task
|
||||
|
|
|
@ -60,6 +60,11 @@ shared_examples :thread_pool_executor do
|
|||
end
|
||||
end
|
||||
|
||||
it 'raises an exception if :max_threads is less than zero' do
|
||||
expect {
|
||||
described_class.new(max_threads: -1)
|
||||
}.to raise_error(ArgumentError)
|
||||
end
|
||||
|
||||
it 'raises an exception if :min_threads is less than zero' do
|
||||
expect {
|
||||
|
@ -67,9 +72,15 @@ shared_examples :thread_pool_executor do
|
|||
}.to raise_error(ArgumentError)
|
||||
end
|
||||
|
||||
it 'raises an exception if :max_threads is not greater than zero' do
|
||||
it 'raises an exception if :max_threads greater than the max allowable' do
|
||||
expect {
|
||||
described_class.new(max_threads: 0)
|
||||
described_class.new(max_threads: described_class::DEFAULT_MAX_POOL_SIZE+1)
|
||||
}.to raise_error(ArgumentError)
|
||||
end
|
||||
|
||||
it 'raises an exception if :max_threads is less than :min_threads' do
|
||||
expect {
|
||||
described_class.new(max_threads: 1, min_threads: 100)
|
||||
}.to raise_error(ArgumentError)
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue