Make pipeline schedule worker resilient
Currently, pipeline schedule worker is unstable because it's sometimes killed by excessive memory consumption. In order to improve the performance, we add the following fixes: 1. next_run_at is always real_next_run, which means the value always takes into account of worker's cron schedule 1. Remove exlusive lock. This is already covered by real_next_run change. 1. Use RunPipelineScheduleWorker for avoiding memory killer. Memory consumption is spread to the multiple sidekiq worker.
This commit is contained in:
parent
96744d0bef
commit
6a18a411a3
11 changed files with 234 additions and 150 deletions
|
@ -27,9 +27,13 @@ module Ci
|
|||
|
||||
scope :active, -> { where(active: true) }
|
||||
scope :inactive, -> { where(active: false) }
|
||||
scope :runnable_schedules, -> { active.where("next_run_at < ?", Time.now) }
|
||||
scope :preloaded, -> { preload(:owner, :project) }
|
||||
|
||||
accepts_nested_attributes_for :variables, allow_destroy: true
|
||||
|
||||
alias_attribute :real_next_run, :next_run_at
|
||||
|
||||
def owned_by?(current_user)
|
||||
owner == current_user
|
||||
end
|
||||
|
@ -46,8 +50,14 @@ module Ci
|
|||
update_attribute(:active, false)
|
||||
end
|
||||
|
||||
##
|
||||
# The `next_run_at` column is set to the actual execution date of `PipelineScheduleWorker`.
|
||||
# This way, a schedule like `*/1 * * * *` won't be triggered in a short interval
|
||||
# when PipelineScheduleWorker runs irregularly by Sidekiq Memory Killer.
|
||||
def set_next_run_at
|
||||
self.next_run_at = Gitlab::Ci::CronParser.new(cron, cron_timezone).next_time_from(Time.now)
|
||||
self.next_run_at = Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'],
|
||||
Time.zone.name)
|
||||
.next_time_from(ideal_next_run_at)
|
||||
end
|
||||
|
||||
def schedule_next_run!
|
||||
|
@ -56,15 +66,14 @@ module Ci
|
|||
update_attribute(:next_run_at, nil) # update without validation
|
||||
end
|
||||
|
||||
def real_next_run(
|
||||
worker_cron: Settings.cron_jobs['pipeline_schedule_worker']['cron'],
|
||||
worker_time_zone: Time.zone.name)
|
||||
Gitlab::Ci::CronParser.new(worker_cron, worker_time_zone)
|
||||
.next_time_from(next_run_at)
|
||||
end
|
||||
|
||||
def job_variables
|
||||
variables&.map(&:to_runner_variable) || []
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def ideal_next_run_at
|
||||
Gitlab::Ci::CronParser.new(cron, cron_timezone).next_time_from(Time.now)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
13
app/services/ci/pipeline_schedule_service.rb
Normal file
13
app/services/ci/pipeline_schedule_service.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Ci
|
||||
class PipelineScheduleService < BaseService
|
||||
def execute(schedule)
|
||||
# Ensure `next_run_at` is set properly before creating a pipeline.
|
||||
# Otherwise, multiple pipelines could be created in a short interval.
|
||||
schedule.schedule_next_run!
|
||||
|
||||
RunPipelineScheduleWorker.perform_async(schedule.id, schedule.owner.id)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -3,47 +3,12 @@
|
|||
class PipelineScheduleWorker
|
||||
include ApplicationWorker
|
||||
include CronjobQueue
|
||||
include ::Gitlab::ExclusiveLeaseHelpers
|
||||
|
||||
EXCLUSIVE_LOCK_KEY = 'pipeline_schedules:run:lock'
|
||||
LOCK_TIMEOUT = 50.minutes
|
||||
|
||||
# rubocop: disable CodeReuse/ActiveRecord
|
||||
def perform
|
||||
in_lock(EXCLUSIVE_LOCK_KEY, ttl: LOCK_TIMEOUT, retries: 1) do
|
||||
Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now)
|
||||
.preload(:owner, :project).find_each do |schedule|
|
||||
|
||||
schedule.schedule_next_run!
|
||||
|
||||
Ci::CreatePipelineService.new(schedule.project,
|
||||
schedule.owner,
|
||||
ref: schedule.ref)
|
||||
.execute!(:schedule, ignore_skip_ci: true, save_on_errors: true, schedule: schedule)
|
||||
rescue => e
|
||||
error(schedule, e)
|
||||
Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules|
|
||||
schedules.each do |schedule|
|
||||
Ci::PipelineScheduleService.new(schedule.project, schedule.owner).execute(schedule)
|
||||
end
|
||||
end
|
||||
end
|
||||
# rubocop: enable CodeReuse/ActiveRecord
|
||||
|
||||
private
|
||||
|
||||
def error(schedule, error)
|
||||
failed_creation_counter.increment
|
||||
|
||||
Rails.logger.error "Failed to create a scheduled pipeline. " \
|
||||
"schedule_id: #{schedule.id} message: #{error.message}"
|
||||
|
||||
Gitlab::Sentry
|
||||
.track_exception(error,
|
||||
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
|
||||
extra: { schedule_id: schedule.id })
|
||||
end
|
||||
|
||||
def failed_creation_counter
|
||||
@failed_creation_counter ||=
|
||||
Gitlab::Metrics.counter(:pipeline_schedule_creation_failed_total,
|
||||
"Counter of failed attempts of pipeline schedule creation")
|
||||
end
|
||||
end
|
||||
|
|
|
@ -21,6 +21,30 @@ class RunPipelineScheduleWorker
|
|||
Ci::CreatePipelineService.new(schedule.project,
|
||||
user,
|
||||
ref: schedule.ref)
|
||||
.execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule)
|
||||
.execute!(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule)
|
||||
rescue Ci::CreatePipelineService::CreateError
|
||||
# no-op. This is a user operation error such as corrupted .gitlab-ci.yml.
|
||||
rescue => e
|
||||
error(schedule, e)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def error(schedule, error)
|
||||
failed_creation_counter.increment
|
||||
|
||||
Rails.logger.error "Failed to create a scheduled pipeline. " \
|
||||
"schedule_id: #{schedule.id} message: #{error.message}"
|
||||
|
||||
Gitlab::Sentry
|
||||
.track_exception(error,
|
||||
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
|
||||
extra: { schedule_id: schedule.id })
|
||||
end
|
||||
|
||||
def failed_creation_counter
|
||||
@failed_creation_counter ||=
|
||||
Gitlab::Metrics.counter(:pipeline_schedule_creation_failed_total,
|
||||
"Counter of failed attempts of pipeline schedule creation")
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
title: Make pipeline schedule worker resilient
|
||||
merge_request: 28407
|
||||
author:
|
||||
type: performance
|
|
@ -7,6 +7,16 @@ FactoryBot.define do
|
|||
description "pipeline schedule"
|
||||
project
|
||||
|
||||
trait :every_minute do
|
||||
cron '*/1 * * * *'
|
||||
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
|
||||
end
|
||||
|
||||
trait :hourly do
|
||||
cron '* */1 * * *'
|
||||
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
|
||||
end
|
||||
|
||||
trait :nightly do
|
||||
cron '0 1 * * *'
|
||||
cron_timezone Gitlab::Ci::CronParser::VALID_SYNTAX_SAMPLE_TIME_ZONE
|
||||
|
|
|
@ -225,7 +225,7 @@ describe 'Pipeline Schedules', :js do
|
|||
context 'when active is true and next_run_at is NULL' do
|
||||
before do
|
||||
create(:ci_pipeline_schedule, project: project, owner: user).tap do |pipeline_schedule|
|
||||
pipeline_schedule.update_attribute(:cron, nil) # Consequently next_run_at will be nil
|
||||
pipeline_schedule.update_attribute(:next_run_at, nil) # Consequently next_run_at will be nil
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -48,32 +48,116 @@ describe Ci::PipelineSchedule do
|
|||
end
|
||||
end
|
||||
|
||||
describe '.runnable_schedules' do
|
||||
subject { described_class.runnable_schedules }
|
||||
|
||||
let!(:pipeline_schedule) do
|
||||
Timecop.freeze(1.day.ago) do
|
||||
create(:ci_pipeline_schedule, :hourly)
|
||||
end
|
||||
end
|
||||
|
||||
it 'returns the runnable schedule' do
|
||||
is_expected.to eq([pipeline_schedule])
|
||||
end
|
||||
|
||||
context 'when there are no runnable schedules' do
|
||||
let!(:pipeline_schedule) { }
|
||||
|
||||
it 'returns an empty array' do
|
||||
is_expected.to be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '.preloaded' do
|
||||
subject { described_class.preloaded }
|
||||
|
||||
before do
|
||||
create_list(:ci_pipeline_schedule, 3)
|
||||
end
|
||||
|
||||
it 'preloads the associations' do
|
||||
subject
|
||||
|
||||
query = ActiveRecord::QueryRecorder.new { subject.each(&:project) }
|
||||
|
||||
expect(query.count).to eq(2)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#set_next_run_at' do
|
||||
let!(:pipeline_schedule) { create(:ci_pipeline_schedule, :nightly) }
|
||||
let(:pipeline_schedule) { create(:ci_pipeline_schedule, :nightly) }
|
||||
let(:ideal_next_run_at) { pipeline_schedule.send(:ideal_next_run_at) }
|
||||
|
||||
let(:expected_next_run_at) do
|
||||
Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
|
||||
.next_time_from(ideal_next_run_at)
|
||||
end
|
||||
|
||||
let(:cron_worker_next_run_at) do
|
||||
Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
|
||||
.next_time_from(Time.now)
|
||||
end
|
||||
|
||||
context 'when creates new pipeline schedule' do
|
||||
let(:expected_next_run_at) do
|
||||
Gitlab::Ci::CronParser.new(pipeline_schedule.cron, pipeline_schedule.cron_timezone)
|
||||
.next_time_from(Time.now)
|
||||
it 'updates next_run_at automatically' do
|
||||
expect(pipeline_schedule.next_run_at).to eq(expected_next_run_at)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when PipelineScheduleWorker runs at a specific interval' do
|
||||
before do
|
||||
allow(Settings).to receive(:cron_jobs) do
|
||||
{
|
||||
'pipeline_schedule_worker' => {
|
||||
'cron' => '0 1 2 3 *'
|
||||
}
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
it 'updates next_run_at automatically' do
|
||||
expect(described_class.last.next_run_at).to eq(expected_next_run_at)
|
||||
it "updates next_run_at to the sidekiq worker's execution time" do
|
||||
expect(pipeline_schedule.next_run_at.min).to eq(0)
|
||||
expect(pipeline_schedule.next_run_at.hour).to eq(1)
|
||||
expect(pipeline_schedule.next_run_at.day).to eq(2)
|
||||
expect(pipeline_schedule.next_run_at.month).to eq(3)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when pipeline schedule runs every minute' do
|
||||
let(:pipeline_schedule) { create(:ci_pipeline_schedule, :every_minute) }
|
||||
|
||||
it "updates next_run_at to the sidekiq worker's execution time" do
|
||||
expect(pipeline_schedule.next_run_at).to eq(cron_worker_next_run_at)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when there are two different pipeline schedules in different time zones' do
|
||||
let(:pipeline_schedule_1) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'Eastern Time (US & Canada)') }
|
||||
let(:pipeline_schedule_2) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
|
||||
|
||||
it 'sets different next_run_at' do
|
||||
expect(pipeline_schedule_1.next_run_at).not_to eq(pipeline_schedule_2.next_run_at)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when there are two different pipeline schedules in the same time zones' do
|
||||
let(:pipeline_schedule_1) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
|
||||
let(:pipeline_schedule_2) { create(:ci_pipeline_schedule, :weekly, cron_timezone: 'UTC') }
|
||||
|
||||
it 'sets the sames next_run_at' do
|
||||
expect(pipeline_schedule_1.next_run_at).to eq(pipeline_schedule_2.next_run_at)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when updates cron of exsisted pipeline schedule' do
|
||||
let(:new_cron) { '0 0 1 1 *' }
|
||||
|
||||
let(:expected_next_run_at) do
|
||||
Gitlab::Ci::CronParser.new(new_cron, pipeline_schedule.cron_timezone)
|
||||
.next_time_from(Time.now)
|
||||
end
|
||||
|
||||
it 'updates next_run_at automatically' do
|
||||
pipeline_schedule.update!(cron: new_cron)
|
||||
|
||||
expect(described_class.last.next_run_at).to eq(expected_next_run_at)
|
||||
expect(pipeline_schedule.next_run_at).to eq(expected_next_run_at)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -83,10 +167,11 @@ describe Ci::PipelineSchedule do
|
|||
|
||||
context 'when reschedules after 10 days from now' do
|
||||
let(:future_time) { 10.days.from_now }
|
||||
let(:ideal_next_run_at) { pipeline_schedule.send(:ideal_next_run_at) }
|
||||
|
||||
let(:expected_next_run_at) do
|
||||
Gitlab::Ci::CronParser.new(pipeline_schedule.cron, pipeline_schedule.cron_timezone)
|
||||
.next_time_from(future_time)
|
||||
Gitlab::Ci::CronParser.new(Settings.cron_jobs['pipeline_schedule_worker']['cron'], Time.zone.name)
|
||||
.next_time_from(ideal_next_run_at)
|
||||
end
|
||||
|
||||
it 'points to proper next_run_at' do
|
||||
|
@ -99,38 +184,6 @@ describe Ci::PipelineSchedule do
|
|||
end
|
||||
end
|
||||
|
||||
describe '#real_next_run' do
|
||||
subject do
|
||||
described_class.last.real_next_run(worker_cron: worker_cron,
|
||||
worker_time_zone: worker_time_zone)
|
||||
end
|
||||
|
||||
context 'when GitLab time_zone is UTC' do
|
||||
before do
|
||||
allow(Time).to receive(:zone)
|
||||
.and_return(ActiveSupport::TimeZone[worker_time_zone])
|
||||
end
|
||||
|
||||
let(:worker_time_zone) { 'UTC' }
|
||||
|
||||
context 'when cron_timezone is Eastern Time (US & Canada)' do
|
||||
before do
|
||||
create(:ci_pipeline_schedule, :nightly,
|
||||
cron_timezone: 'Eastern Time (US & Canada)')
|
||||
end
|
||||
|
||||
let(:worker_cron) { '0 1 2 3 *' }
|
||||
|
||||
it 'returns the next time worker executes' do
|
||||
expect(subject.min).to eq(0)
|
||||
expect(subject.hour).to eq(1)
|
||||
expect(subject.day).to eq(2)
|
||||
expect(subject.month).to eq(3)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#job_variables' do
|
||||
let!(:pipeline_schedule) { create(:ci_pipeline_schedule) }
|
||||
|
||||
|
|
28
spec/services/ci/pipeline_schedule_service_spec.rb
Normal file
28
spec/services/ci/pipeline_schedule_service_spec.rb
Normal file
|
@ -0,0 +1,28 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe Ci::PipelineScheduleService do
|
||||
let(:project) { create(:project) }
|
||||
let(:user) { create(:user) }
|
||||
let(:service) { described_class.new(project, user) }
|
||||
|
||||
describe '#execute' do
|
||||
subject { service.execute(schedule) }
|
||||
|
||||
let(:schedule) { create(:ci_pipeline_schedule, project: project, owner: user) }
|
||||
|
||||
it 'schedules next run' do
|
||||
expect(schedule).to receive(:schedule_next_run!)
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
it 'runs RunPipelineScheduleWorker' do
|
||||
expect(RunPipelineScheduleWorker)
|
||||
.to receive(:perform_async).with(schedule.id, schedule.owner.id)
|
||||
|
||||
subject
|
||||
end
|
||||
end
|
||||
end
|
|
@ -41,16 +41,6 @@ describe PipelineScheduleWorker do
|
|||
|
||||
it_behaves_like 'successful scheduling'
|
||||
|
||||
context 'when exclusive lease has already been taken by the other instance' do
|
||||
before do
|
||||
stub_exclusive_lease_taken(described_class::EXCLUSIVE_LOCK_KEY, timeout: described_class::LOCK_TIMEOUT)
|
||||
end
|
||||
|
||||
it 'raises an error and does not start creating pipelines' do
|
||||
expect { subject }.to raise_error(Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the latest commit contains [ci skip]' do
|
||||
before do
|
||||
allow_any_instance_of(Ci::Pipeline)
|
||||
|
@ -77,47 +67,19 @@ describe PipelineScheduleWorker do
|
|||
stub_ci_pipeline_yaml_file(YAML.dump(rspec: { variables: 'rspec' } ))
|
||||
end
|
||||
|
||||
it 'creates a failed pipeline with the reason' do
|
||||
expect { subject }.to change { project.ci_pipelines.count }.by(1)
|
||||
expect(Ci::Pipeline.last).to be_config_error
|
||||
expect(Ci::Pipeline.last.yaml_errors).not_to be_nil
|
||||
it 'does not creates a new pipeline' do
|
||||
expect { subject }.not_to change { project.ci_pipelines.count }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the schedule is not runnable by the user' do
|
||||
before do
|
||||
expect(Gitlab::Sentry)
|
||||
.to receive(:track_exception)
|
||||
.with(Ci::CreatePipelineService::CreateError,
|
||||
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
|
||||
extra: { schedule_id: pipeline_schedule.id } ).once
|
||||
end
|
||||
|
||||
it 'does not deactivate the schedule' do
|
||||
subject
|
||||
|
||||
expect(pipeline_schedule.reload.active).to be_truthy
|
||||
end
|
||||
|
||||
it 'increments Prometheus counter' do
|
||||
expect(Gitlab::Metrics)
|
||||
.to receive(:counter)
|
||||
.with(:pipeline_schedule_creation_failed_total, "Counter of failed attempts of pipeline schedule creation")
|
||||
.and_call_original
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
it 'logging a pipeline error' do
|
||||
expect(Rails.logger)
|
||||
.to receive(:error)
|
||||
.with(a_string_matching("Insufficient permissions to create a new pipeline"))
|
||||
.and_call_original
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
it 'does not create a pipeline' do
|
||||
expect { subject }.not_to change { project.ci_pipelines.count }
|
||||
end
|
||||
|
@ -131,21 +93,6 @@ describe PipelineScheduleWorker do
|
|||
before do
|
||||
stub_ci_pipeline_yaml_file(nil)
|
||||
project.add_maintainer(user)
|
||||
|
||||
expect(Gitlab::Sentry)
|
||||
.to receive(:track_exception)
|
||||
.with(Ci::CreatePipelineService::CreateError,
|
||||
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
|
||||
extra: { schedule_id: pipeline_schedule.id } ).once
|
||||
end
|
||||
|
||||
it 'logging a pipeline error' do
|
||||
expect(Rails.logger)
|
||||
.to receive(:error)
|
||||
.with(a_string_matching("Missing .gitlab-ci.yml file"))
|
||||
.and_call_original
|
||||
|
||||
subject
|
||||
end
|
||||
|
||||
it 'does not create a pipeline' do
|
||||
|
|
|
@ -32,7 +32,37 @@ describe RunPipelineScheduleWorker do
|
|||
|
||||
it 'calls the Service' do
|
||||
expect(Ci::CreatePipelineService).to receive(:new).with(project, user, ref: pipeline_schedule.ref).and_return(create_pipeline_service)
|
||||
expect(create_pipeline_service).to receive(:execute).with(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: pipeline_schedule)
|
||||
expect(create_pipeline_service).to receive(:execute!).with(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: pipeline_schedule)
|
||||
|
||||
worker.perform(pipeline_schedule.id, user.id)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when database statement timeout happens' do
|
||||
before do
|
||||
allow(Ci::CreatePipelineService).to receive(:new) { raise ActiveRecord::StatementInvalid }
|
||||
|
||||
expect(Gitlab::Sentry)
|
||||
.to receive(:track_exception)
|
||||
.with(ActiveRecord::StatementInvalid,
|
||||
issue_url: 'https://gitlab.com/gitlab-org/gitlab-ce/issues/41231',
|
||||
extra: { schedule_id: pipeline_schedule.id } ).once
|
||||
end
|
||||
|
||||
it 'increments Prometheus counter' do
|
||||
expect(Gitlab::Metrics)
|
||||
.to receive(:counter)
|
||||
.with(:pipeline_schedule_creation_failed_total, "Counter of failed attempts of pipeline schedule creation")
|
||||
.and_call_original
|
||||
|
||||
worker.perform(pipeline_schedule.id, user.id)
|
||||
end
|
||||
|
||||
it 'logging a pipeline error' do
|
||||
expect(Rails.logger)
|
||||
.to receive(:error)
|
||||
.with(a_string_matching('ActiveRecord::StatementInvalid'))
|
||||
.and_call_original
|
||||
|
||||
worker.perform(pipeline_schedule.id, user.id)
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue