Introduce ThreadedConnectionPool for customized pool

This way we could reuse this pool for other migrations

Feedback:

* https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/8987#note_22923350
* https://gitlab.com/gitlab-org/gitlab-ce/merge_requests/8987#note_22923365
This commit is contained in:
Lin Jen-Shin 2017-02-07 22:09:13 +08:00
parent 0c2f4a3c42
commit 7ecee7a4d7
2 changed files with 65 additions and 28 deletions

View file

@ -3,38 +3,27 @@ class RemoveInactiveDefaultEmailServices < ActiveRecord::Migration
DOWNTIME = false DOWNTIME = false
disable_ddl_transaction!
def up def up
builds_service = spawn <<-SQL.strip_heredoc Gitlab::Database::ThreadedConnectionPool.with_pool(2) do |pool|
DELETE FROM services pool.execute_async <<-SQL.strip_heredoc
WHERE type = 'BuildsEmailService' DELETE FROM services
AND active IS FALSE WHERE type = 'BuildsEmailService'
AND properties = '{"notify_only_broken_builds":true}'; AND active IS FALSE
SQL AND properties = '{"notify_only_broken_builds":true}';
SQL
pipelines_service = spawn <<-SQL.strip_heredoc pool.execute_async <<-SQL.strip_heredoc
DELETE FROM services DELETE FROM services
WHERE type = 'PipelinesEmailService' WHERE type = 'PipelinesEmailService'
AND active IS FALSE AND active IS FALSE
AND properties = '{"notify_only_broken_pipelines":true}'; AND properties = '{"notify_only_broken_pipelines":true}';
SQL SQL
[builds_service, pipelines_service].each(&:join)
end
private
def spawn(query)
Thread.new do
with_connection do |connection|
connection.execute(query)
end
end end
end end
def with_connection(&block) def down
pool = ActiveRecord::Base.establish_connection # Nothing can be done to restore the records
pool.with_connection(&block)
ensure
pool.disconnect!
end end
end end

View file

@ -0,0 +1,48 @@
module Gitlab
module Database
class ThreadedConnectionPool
def self.with_pool(pool_size)
pool = new(pool_size)
yield(pool)
ensure
pool.join
pool.close
end
def initialize(pool_size)
config = ActiveRecord::Base.configurations[Rails.env]
@ar_pool = ActiveRecord::Base.establish_connection(
config.merge(pool: pool_size))
@workers = []
@mutex = Mutex.new
end
def execute_async(sql)
@mutex.synchronize do
@workers << Thread.new do
@ar_pool.with_connection do |connection|
connection.execute(sql)
end
end
end
end
def join
threads = nil
@mutex.synchronize do
threads = @workers.dup
@workers.clear
end
threads.each(&:join)
end
def close
@ar_pool.disconnect!
end
end
end
end