Use threads directly, introduce pool later:

Feedback:
https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/8987#note_22938402
This commit is contained in:
Lin Jen-Shin 2017-02-09 21:11:31 +08:00
parent 1d8db6652f
commit 8aa1055fe3
2 changed files with 33 additions and 53 deletions

View File

@ -6,24 +6,52 @@ class RemoveInactiveDefaultEmailServices < ActiveRecord::Migration
disable_ddl_transaction!
def up
Gitlab::Database::ThreadedConnectionPool.with_pool(2) do |pool|
pool.execute_async <<-SQL.strip_heredoc
pool = create_connection_pool
threads = []
threads << Thread.new do
pool.with_connection do |connection|
connection.execute <<-SQL.strip_heredoc
DELETE FROM services
WHERE type = 'BuildsEmailService'
AND active IS FALSE
AND properties = '{"notify_only_broken_builds":true}';
SQL
SQL
end
end
pool.execute_async <<-SQL.strip_heredoc
threads << Thread.new do
pool.with_connection do |connection|
connection.execute <<-SQL.strip_heredoc
DELETE FROM services
WHERE type = 'PipelinesEmailService'
AND active IS FALSE
AND properties = '{"notify_only_broken_pipelines":true}';
SQL
SQL
end
end
threads.each(&:join)
end
def down
# Nothing can be done to restore the records
end
private
def create_connection_pool
# See activerecord-4.2.7.1/lib/active_record/connection_adapters/connection_specification.rb
env = Rails.env
original_config = ActiveRecord::Base.configurations
env_config = original_config[env].merge('pool' => 2)
config = original_config.merge(env => env_config)
spec =
ActiveRecord::
ConnectionAdapters::
ConnectionSpecification::Resolver.new(config).spec(env.to_sym)
ActiveRecord::ConnectionAdapters::ConnectionPool.new(spec)
end
end

View File

@ -1,48 +0,0 @@
module Gitlab
module Database
class ThreadedConnectionPool
def self.with_pool(pool_size)
pool = new(pool_size)
yield(pool)
ensure
pool.join
pool.close
end
def initialize(pool_size)
config = ActiveRecord::Base.configurations[Rails.env]
@ar_pool = ActiveRecord::Base.establish_connection(
config.merge(pool: pool_size))
@workers = []
@mutex = Mutex.new
end
def execute_async(sql)
@mutex.synchronize do
@workers << Thread.new do
@ar_pool.with_connection do |connection|
connection.execute(sql)
end
end
end
end
def join
threads = nil
@mutex.synchronize do
threads = @workers.dup
@workers.clear
end
threads.each(&:join)
end
def close
@ar_pool.disconnect!
end
end
end
end