From 631e759c19eb617339a8ae733861b71103298f38 Mon Sep 17 00:00:00 2001 From: GitLab Bot Date: Sat, 1 Jan 2022 18:13:43 +0000 Subject: [PATCH] Add latest changes from gitlab-org/gitlab@master --- app/events/ci/pipeline_created_event.rb | 14 + app/services/ci/create_pipeline_service.rb | 9 +- app/workers/all_queues.yml | 9 + .../update_head_pipeline_worker.rb | 23 ++ ..._head_pipeline_for_merge_request_worker.rb | 2 + .../ci_publish_pipeline_events.yml | 8 + config/sidekiq_queues.yml | 2 + doc/development/event_store.md | 292 ++++++++++++++++++ doc/development/index.md | 1 + lib/gitlab/event_store.rb | 42 +++ lib/gitlab/event_store/event.rb | 54 ++++ lib/gitlab/event_store/store.rb | 54 ++++ lib/gitlab/event_store/subscriber.rb | 36 +++ lib/gitlab/event_store/subscription.rb | 37 +++ spec/lib/gitlab/event_store/event_spec.rb | 64 ++++ spec/lib/gitlab/event_store/store_spec.rb | 262 ++++++++++++++++ .../ci/create_pipeline_service_spec.rb | 128 +------- .../update_head_pipeline_worker_spec.rb | 138 +++++++++ 18 files changed, 1063 insertions(+), 112 deletions(-) create mode 100644 app/events/ci/pipeline_created_event.rb create mode 100644 app/workers/merge_requests/update_head_pipeline_worker.rb create mode 100644 config/feature_flags/development/ci_publish_pipeline_events.yml create mode 100644 doc/development/event_store.md create mode 100644 lib/gitlab/event_store.rb create mode 100644 lib/gitlab/event_store/event.rb create mode 100644 lib/gitlab/event_store/store.rb create mode 100644 lib/gitlab/event_store/subscriber.rb create mode 100644 lib/gitlab/event_store/subscription.rb create mode 100644 spec/lib/gitlab/event_store/event_spec.rb create mode 100644 spec/lib/gitlab/event_store/store_spec.rb create mode 100644 spec/workers/merge_requests/update_head_pipeline_worker_spec.rb diff --git a/app/events/ci/pipeline_created_event.rb b/app/events/ci/pipeline_created_event.rb new file mode 100644 index 00000000000..8b971b63cea --- /dev/null +++ b/app/events/ci/pipeline_created_event.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Ci + class PipelineCreatedEvent < ::Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'properties' => { + 'pipeline_id' => { 'type' => 'integer' } + } + } + end + end +end diff --git a/app/services/ci/create_pipeline_service.rb b/app/services/ci/create_pipeline_service.rb index c1f35afba40..5338f047051 100644 --- a/app/services/ci/create_pipeline_service.rb +++ b/app/services/ci/create_pipeline_service.rb @@ -95,7 +95,14 @@ module Ci .build! if pipeline.persisted? - schedule_head_pipeline_update + if Feature.enabled?(:ci_publish_pipeline_events, pipeline.project, default_enabled: :yaml) + Gitlab::EventStore.publish( + Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) + ) + else + schedule_head_pipeline_update + end + create_namespace_onboarding_action else # If pipeline is not persisted, try to recover IID diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 21c8107b8e5..0d4b92a5065 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -2420,6 +2420,15 @@ :weight: 1 :idempotent: true :tags: [] +- :name: merge_requests_update_head_pipeline + :worker_name: MergeRequests::UpdateHeadPipelineWorker + :feature_category: :code_review + :has_external_dependencies: + :urgency: :high + :resource_boundary: :cpu + :weight: 1 + :idempotent: true + :tags: [] - :name: metrics_dashboard_prune_old_annotations :worker_name: Metrics::Dashboard::PruneOldAnnotationsWorker :feature_category: :metrics diff --git a/app/workers/merge_requests/update_head_pipeline_worker.rb b/app/workers/merge_requests/update_head_pipeline_worker.rb new file mode 100644 index 00000000000..c8dc9d1f7c8 --- /dev/null +++ b/app/workers/merge_requests/update_head_pipeline_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module MergeRequests + class UpdateHeadPipelineWorker + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + feature_category :code_review + urgency :high + worker_resource_boundary :cpu + data_consistency :always + + idempotent! + + def handle_event(event) + Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline| + pipeline.all_merge_requests.opened.each do |merge_request| + UpdateHeadPipelineForMergeRequestWorker.perform_async(merge_request.id) + end + end + end + end +end diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 61fe278e016..3a2447b2108 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -8,8 +8,10 @@ class UpdateHeadPipelineForMergeRequestWorker sidekiq_options retry: 3 include PipelineQueue + # NOTE: this worker belongs to :code_review since there is no CI logic. queue_namespace :pipeline_processing feature_category :continuous_integration + urgency :high worker_resource_boundary :cpu diff --git a/config/feature_flags/development/ci_publish_pipeline_events.yml b/config/feature_flags/development/ci_publish_pipeline_events.yml new file mode 100644 index 00000000000..2d47084f499 --- /dev/null +++ b/config/feature_flags/development/ci_publish_pipeline_events.yml @@ -0,0 +1,8 @@ +--- +name: ci_publish_pipeline_events +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/34042 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/336752 +milestone: '14.3' +type: development +group: group::pipeline execution +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index 49989e022fa..6395f051a0a 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -255,6 +255,8 @@ - 1 - - merge_requests_sync_code_owner_approval_rules - 1 +- - merge_requests_update_head_pipeline + - 1 - - metrics_dashboard_prune_old_annotations - 1 - - metrics_dashboard_sync_dashboards diff --git a/doc/development/event_store.md b/doc/development/event_store.md new file mode 100644 index 00000000000..85dfdf8b8ef --- /dev/null +++ b/doc/development/event_store.md @@ -0,0 +1,292 @@ +--- +stage: none +group: unassigned +info: To determine the technical writer assigned to the Stage/Group associated with this page, see https://about.gitlab.com/handbook/engineering/ux/technical-writing/#assignments +--- + +# GitLab EventStore + +## Background + +The monolithic GitLab project is becoming larger and more domains are being defined. +As a result, these domains are becoming entangled with each others due to temporal coupling. + +An emblematic example is the [`PostReceive`](https://gitlab.com/gitlab-org/gitlab/blob/master/app/workers/post_receive.rb) +worker where a lot happens across multiple domains. If a new behavior reacts to +a new commit being pushed, then we add code somewhere in `PostReceive` or its sub-components +(`Git::ProcessRefChangesService`, for example). + +This type of architecture: + +- Is a violation of the Single Responsibility Principle. +- Increases the risk of adding code to a codebase you are not familiar with. + There may be nuances you don't know about which may introduce bugs or a performance degradation. +- Violates domain boundaries. Inside a specific namespace (for example `Git::`) we suddenly see + classes from other domains chiming in (like `Ci::` or `MergeRequests::`). + +## What is EventStore? + +`Gitlab:EventStore` is a basic pub-sub system built on top of the existing Sidekiq workers and observability we have today. +We use this system to apply an event-driven approach when modeling a domain while keeping coupling +to a minimum. + +This essentially leaves the existing Sidekiq workers as-is to perform asynchronous work but inverts +the dependency. + +### EventStore example + +When a CI pipeline is created we update the head pipeline for any merge request matching the +pipeline's `ref`. The merge request can then display the status of the latest pipeline. + +#### Without the EventStore + +We change `Ci::CreatePipelineService` and add logic (like an `if` statement) to check if the +pipeline is created. Then we schedule a worker to run some side-effects for the `MergeRequests::` domain. + +This style violates the [Open-Closed Principle](https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle) +and unnecessarily add side-effects logic from other domains, increasing coupling: + +```mermaid +graph LR + subgraph ci[CI] + cp[CreatePipelineService] + end + + subgraph mr[MergeRequests] + upw[UpdateHeadPipelineWorker] + end + + subgraph no[Namespaces::Onboarding] + pow[PipelinesOnboardedWorker] + end + + cp -- perform_async --> upw + cp -- perform_async --> pow +``` + +#### With the EventStore + +`Ci::CreatePipelineService` publishes an event `Ci::PipelineCreatedEvent` and its responsibility stops here. + +The `MergeRequests::` domain can subscribe to this event with a worker `MergeRequests::UpdateHeadPipelineWorker`, so: + +- Side-effects are scheduled asynchronously and don't impact the main business transaction that + emits the domain event. +- More side-effects can be added without modifying the main business transaction. +- We can clearly see what domains are involved and their ownership. +- We can identify what events occur in the system because they are explicitly declared. + +With `Gitlab::EventStore` there is still coupling between the subscriber (Sidekiq worker) and the schema of the domain event. +This level of coupling is much smaller than having the main transaction (`Ci::CreatePipelineService`) coupled to: + +- multiple subscribers. +- multiple ways of invoking subscribers (including conditional invocations). +- multiple ways of passing parameters. + +```mermaid +graph LR + subgraph ci[CI] + cp[CreatePipelineService] + cp -- publish --> e[PipelineCreateEvent] + end + + subgraph mr[MergeRequests] + upw[UpdateHeadPipelineWorker] + end + + subgraph no[Namespaces::Onboarding] + pow[PipelinesOnboardedWorker] + end + + upw -. subscribe .-> e + pow -. subscribe .-> e +``` + +Each subscriber, being itself a Sidekiq worker, can specify any attributes that are related +to the type of work they are responsible for. For example, one subscriber could define +`urgency: high` while another one less critical could set `urgency: low`. + +The EventStore is only an abstraction that allows us to have Dependency Inversion. This helps +separating a business transaction from side-effects (often executed in other domains). + +When an event is published, the EventStore calls `perform_async` on each subscribed worker, +passing in the event information as arguments. This essentially schedules a Sidekiq job on each +subscriber's queue. + +This means that nothing else changes with regards to how subscribers work, as they are just +Sidekiq workers. For example: if a worker (subscriber) fails to execute a job, the job is put +back into Sidekiq to be retried. + +## EventStore advantages + +- Subscribers (Sidekiq workers) can be set to run quicker by changing the worker weight + if the side-effect is critical. +- Automatically enforce the fact that side-effects run asynchronously. + This makes it safe for other domains to subscribe to events without affecting the performance of the + main business transaction. + +## Define an event + +An `Event` object represents a domain event that occurred in a bounded context. +Notify other bounded contexts about something +that happened by publishing events, so that they can react to it. + +Define new event classes under `app/events//` with a name representing something that happened in the past: + +```ruby +class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event + def schema + { + 'type' => 'object', + 'required' => ['pipeline_id'], + 'properties' => { + 'pipeline_id' => { 'type' => 'integer' }, + 'ref' => { 'type' => 'string' } + } + } + end +end +``` + +The schema is validated immediately when we initialize the event object so we can ensure that +publishers follow the contract with the subscribers. + +We recommend using optional properties as much as possible, which require fewer rollouts for schema changes. +However, `required` properties could be used for unique identifiers of the event's subject. For example: + +- `pipeline_id` can be a required property for a `Ci::PipelineCreatedEvent`. +- `project_id` can be a required property for a `Projects::ProjectDeletedEvent`. + +Publish only properties that are needed by the subscribers without tailoring the payload to specific subscribers. +The payload should fully represent the event and not contain loosely related properties. For example: + +```ruby +Ci::PipelineCreatedEvent.new(data: { + pipeline_id: pipeline.id, + # unless all subscribers need merge request IDs, + # this is data that can be fetched by the subscriber. + merge_request_ids: pipeline.all_merge_requests.pluck(:id) +}) +``` + +Publishing events with more properties provides the subscribers with the data +they need in the first place. Otherwise subscribers have to fetch the additional data from the database. +However, this can lead to continuous changes to the schema and possibly adding properties that may not +represent the single source of truth. +It's best to use this technique as a performance optimization. For example: when an event has many +subscribers that all fetch the same data again from the database. + +### Update the schema + +Changes to the schema require multiple rollouts. While the new version is being deployed: + +- Existing publishers can publish events using the old version. +- Existing subscribers can consume events using the old version. +- Events get persisted in the Sidekiq queue as job arguments, so we could have 2 versions of the schema during deployments. + +As changing the schema ultimately impacts the Sidekiq arguments, please refer to our +[Sidekiq style guide](sidekiq_style_guide.md#changing-the-arguments-for-a-worker) with regards to multiple rollouts. + +#### Add properties + +1. Rollout 1: + - Add new properties as optional (not `required`). + - Update the subscriber so it can consume events with and without the new properties. +1. Rollout 2: + - Change the publisher to provide the new property +1. Rollout 3: (if the property should be `required`): + - Change the schema and the subscriber code to always expect it. + +#### Remove properties + +1. Rollout 1: + - If the property is `required`, make it optional. + - Update the subscriber so it does not always expect the property. +1. Rollout 2: + - Remove the property from the event publishing. + - Remove the code from the subscriber that processes the property. + +#### Other changes + +For other changes, like renaming a property, use the same steps: + +1. Remove the old property +1. Add the new property + +## Publish an event + +To publish the event from the [previous example](#define-an-event): + +```ruby +Gitlab::EventStore.publish( + Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) +) +``` + +## Create a subscriber + +A subscriber is a Sidekiq worker that includes the `Gitlab::EventStore::Subscriber` module. +This module takes care of the `perform` method and provides a better abstraction to handle +the event safely via the `handle_event` method. For example: + +```ruby +module MergeRequests + class UpdateHeadPipelineWorker + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + def handle_event(event) + Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline| + # ... + end + end + end +end +``` + +## Register the subscriber to the event + +To subscribe the worker to a specific event in `lib/gitlab/event_store.rb`, +add a line like this to the `Gitlab::EventStore.configure!` method: + +```ruby +module Gitlab + module EventStore + def self.configure! + Store.new.tap do |store| + # ... + + store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent + + # ... + end + end + end +end +``` + +Subscriptions are stored in memory when the Rails app is loaded and they are immediately frozen. +It's not possible to modify subscriptions at runtime. + +### Conditional dispatch of events + +A subscription can specify a condition when to accept an event: + +```ruby +store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, + to: ::Ci::PipelineCreatedEvent, + if: -> (event) { event.data[:merge_request_id].present? } +``` + +This tells the event store to dispatch `Ci::PipelineCreatedEvent`s to the subscriber if +the condition is met. + +This technique can avoid scheduling Sidekiq jobs if the subscriber is interested in a +small subset of events. + +WARNING: +When using conditional dispatch it must contain only cheap conditions because they are +executed synchronously every time the given event is published. + +For complex conditions it's best to subscribe to all the events and then handle the logic +in the `handle_event` method of the subscriber worker. diff --git a/doc/development/index.md b/doc/development/index.md index 1398104abda..a31bc2d5a6f 100644 --- a/doc/development/index.md +++ b/doc/development/index.md @@ -164,6 +164,7 @@ the [reviewer values](https://about.gitlab.com/handbook/engineering/workflow/rev ### General - [Directory structure](directory_structure.md) +- [GitLab EventStore](event_store.md) to publish/subscribe to domain events - [GitLab utilities](utilities.md) - [Newlines style guide](newlines_styleguide.md) - [Logging](logging.md) diff --git a/lib/gitlab/event_store.rb b/lib/gitlab/event_store.rb new file mode 100644 index 00000000000..3d7b6b27eb0 --- /dev/null +++ b/lib/gitlab/event_store.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +# Gitlab::EventStore is a simple pub-sub mechanism that lets you publish +# domain events and use Sidekiq workers as event handlers. +# +# It can be used to decouple domains from different bounded contexts +# by publishing domain events and let any interested parties subscribe +# to them. +# +module Gitlab + module EventStore + Error = Class.new(StandardError) + InvalidEvent = Class.new(Error) + InvalidSubscriber = Class.new(Error) + + def self.publish(event) + instance.publish(event) + end + + def self.instance + @instance ||= configure! + end + + # Define all event subscriptions using: + # + # store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent) + # + # It is possible to subscribe to a subset of events matching a condition: + # + # store.subscribe(DomainA::SomeWorker, to: DomainB::SomeEvent), if: ->(event) { event.data == :some_value } + # + def self.configure! + Store.new do |store| + ### + # Add subscriptions here: + + store.subscribe ::MergeRequests::UpdateHeadPipelineWorker, to: ::Ci::PipelineCreatedEvent + end + end + private_class_method :configure! + end +end diff --git a/lib/gitlab/event_store/event.rb b/lib/gitlab/event_store/event.rb new file mode 100644 index 00000000000..ee0c329b8e8 --- /dev/null +++ b/lib/gitlab/event_store/event.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# An Event object represents a domain event that occurred in a bounded context. +# By publishing events we notify other bounded contexts about something +# that happened, so that they can react to it. +# +# Define new event classes under `app/events//` with a name +# representing something that happened in the past: +# +# class Projects::ProjectCreatedEvent < Gitlab::EventStore::Event +# def schema +# { +# 'type' => 'object', +# 'properties' => { +# 'project_id' => { 'type' => 'integer' } +# } +# } +# end +# end +# +# To publish it: +# +# Gitlab::EventStore.publish( +# Projects::ProjectCreatedEvent.new(data: { project_id: project.id }) +# ) +# +module Gitlab + module EventStore + class Event + attr_reader :data + + def initialize(data:) + validate_schema!(data) + @data = data + end + + def schema + raise NotImplementedError, 'must specify schema to validate the event' + end + + private + + def validate_schema!(data) + unless data.is_a?(Hash) + raise Gitlab::EventStore::InvalidEvent, "Event data must be a Hash" + end + + unless JSONSchemer.schema(schema).valid?(data.deep_stringify_keys) + raise Gitlab::EventStore::InvalidEvent, "Data for event #{self.class} does not match the defined schema: #{schema}" + end + end + end + end +end diff --git a/lib/gitlab/event_store/store.rb b/lib/gitlab/event_store/store.rb new file mode 100644 index 00000000000..ecf3cd7e562 --- /dev/null +++ b/lib/gitlab/event_store/store.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Gitlab + module EventStore + class Store + attr_reader :subscriptions + + def initialize + @subscriptions = Hash.new { |h, k| h[k] = [] } + + yield(self) if block_given? + + # freeze the subscriptions as safety measure to avoid further + # subcriptions after initialization. + lock! + end + + def subscribe(worker, to:, if: nil) + condition = binding.local_variable_get('if') + + Array(to).each do |event| + validate_subscription!(worker, event) + subscriptions[event] << Gitlab::EventStore::Subscription.new(worker, condition) + end + end + + def publish(event) + unless event.is_a?(Event) + raise InvalidEvent, "Event being published is not an instance of Gitlab::EventStore::Event: got #{event.inspect}" + end + + subscriptions[event.class].each do |subscription| + subscription.consume_event(event) + end + end + + private + + def lock! + @subscriptions.freeze + end + + def validate_subscription!(subscriber, event_class) + unless event_class < Event + raise InvalidEvent, "Event being subscribed to is not a subclass of Gitlab::EventStore::Event: got #{event_class}" + end + + unless subscriber.respond_to?(:perform_async) + raise InvalidSubscriber, "Subscriber is not an ApplicationWorker: got #{subscriber}" + end + end + end + end +end diff --git a/lib/gitlab/event_store/subscriber.rb b/lib/gitlab/event_store/subscriber.rb new file mode 100644 index 00000000000..cf326d1f9e4 --- /dev/null +++ b/lib/gitlab/event_store/subscriber.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +# This module should be included in order to turn an ApplicationWorker +# into a Subscriber. +# This module overrides the `perform` method and provides a better and +# safer interface for handling events via `handle_event` method. +# +# @example: +# class SomeEventSubscriber +# include ApplicationWorker +# include Gitlab::EventStore::Subscriber +# +# def handle_event(event) +# # ... +# end +# end + +module Gitlab + module EventStore + module Subscriber + def perform(event_type, data) + raise InvalidEvent, event_type unless self.class.const_defined?(event_type) + + event = event_type.constantize.new( + data: data.with_indifferent_access + ) + + handle_event(event) + end + + def handle_event(event) + raise NotImplementedError, 'you must implement this methods in order to handle events' + end + end + end +end diff --git a/lib/gitlab/event_store/subscription.rb b/lib/gitlab/event_store/subscription.rb new file mode 100644 index 00000000000..e5c92ab969f --- /dev/null +++ b/lib/gitlab/event_store/subscription.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module Gitlab + module EventStore + class Subscription + attr_reader :worker, :condition + + def initialize(worker, condition) + @worker = worker + @condition = condition + end + + def consume_event(event) + return unless condition_met?(event) + + worker.perform_async(event.class.name, event.data) + # TODO: Log dispatching of events to subscriber + + # We rescue and track any exceptions here because we don't want to + # impact other subscribers if one is faulty. + # The method `condition_met?`, since it can run a block, it might encounter + # a bug. By raising an exception here we could interrupt the publishing + # process, preventing other subscribers from consuming the event. + rescue StandardError => e + Gitlab::ErrorTracking.track_and_raise_for_dev_exception(e, event_class: event.class.name, event_data: event.data) + end + + private + + def condition_met?(event) + return true unless condition + + condition.call(event) + end + end + end +end diff --git a/spec/lib/gitlab/event_store/event_spec.rb b/spec/lib/gitlab/event_store/event_spec.rb new file mode 100644 index 00000000000..97f6870a5ec --- /dev/null +++ b/spec/lib/gitlab/event_store/event_spec.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::EventStore::Event do + let(:event_class) { stub_const('TestEvent', Class.new(described_class)) } + let(:event) { event_class.new(data: data) } + let(:data) { { project_id: 123, project_path: 'org/the-project' } } + + context 'when schema is not defined' do + it 'raises an error on initialization' do + expect { event }.to raise_error(NotImplementedError) + end + end + + context 'when schema is defined' do + before do + event_class.class_eval do + def schema + { + 'required' => ['project_id'], + 'type' => 'object', + 'properties' => { + 'project_id' => { 'type' => 'integer' }, + 'project_path' => { 'type' => 'string' } + } + } + end + end + end + + describe 'schema validation' do + context 'when data matches the schema' do + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end + end + + context 'when required properties are present as well as unknown properties' do + let(:data) { { project_id: 123, unknown_key: 'unknown_value' } } + + it 'initializes the event correctly' do + expect(event.data).to eq(data) + end + end + + context 'when some properties are missing' do + let(:data) { { project_path: 'org/the-project' } } + + it 'expects all properties to be present' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, /does not match the defined schema/) + end + end + + context 'when data is not a Hash' do + let(:data) { 123 } + + it 'raises an error' do + expect { event }.to raise_error(Gitlab::EventStore::InvalidEvent, 'Event data must be a Hash') + end + end + end + end +end diff --git a/spec/lib/gitlab/event_store/store_spec.rb b/spec/lib/gitlab/event_store/store_spec.rb new file mode 100644 index 00000000000..711e1d5b4d5 --- /dev/null +++ b/spec/lib/gitlab/event_store/store_spec.rb @@ -0,0 +1,262 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Gitlab::EventStore::Store do + let(:event_klass) { stub_const('TestEvent', Class.new(Gitlab::EventStore::Event)) } + let(:event) { event_klass.new(data: data) } + let(:another_event_klass) { stub_const('TestAnotherEvent', Class.new(Gitlab::EventStore::Event)) } + + let(:worker) do + stub_const('EventSubscriber', Class.new).tap do |klass| + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + + def handle_event(event) + event.data + end + end + end + end + + let(:another_worker) do + stub_const('AnotherEventSubscriber', Class.new).tap do |klass| + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + end + end + end + + let(:unrelated_worker) do + stub_const('UnrelatedEventSubscriber', Class.new).tap do |klass| + klass.class_eval do + include ApplicationWorker + include Gitlab::EventStore::Subscriber + end + end + end + + before do + event_klass.class_eval do + def schema + { + 'required' => %w[name id], + 'type' => 'object', + 'properties' => { + 'name' => { 'type' => 'string' }, + 'id' => { 'type' => 'integer' } + } + } + end + end + end + + describe '#subscribe' do + it 'subscribes a worker to an event' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes multiple workers to an event' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + s.subscribe another_worker, to: event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker, another_worker) + end + + it 'subscribes a worker to multiple events is separate calls' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass + s.subscribe worker, to: another_event_klass + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + + subscriptions = store.subscriptions[another_event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes a worker to multiple events in a single call' do + store = described_class.new do |s| + s.subscribe worker, to: [event_klass, another_event_klass] + end + + subscriptions = store.subscriptions[event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + + subscriptions = store.subscriptions[another_event_klass] + expect(subscriptions.map(&:worker)).to contain_exactly(worker) + end + + it 'subscribes a worker to an event with condition' do + store = described_class.new do |s| + s.subscribe worker, to: event_klass, if: ->(event) { event.data[:name] == 'Alice' } + end + + subscriptions = store.subscriptions[event_klass] + + expect(subscriptions.size).to eq(1) + + subscription = subscriptions.first + expect(subscription).to be_an_instance_of(Gitlab::EventStore::Subscription) + expect(subscription.worker).to eq(worker) + expect(subscription.condition.call(double(data: { name: 'Bob' }))).to eq(false) + expect(subscription.condition.call(double(data: { name: 'Alice' }))).to eq(true) + end + + it 'refuses the subscription if the target is not an Event object' do + expect do + described_class.new do |s| + s.subscribe worker, to: Integer + end + end.to raise_error( + Gitlab::EventStore::Error, + /Event being subscribed to is not a subclass of Gitlab::EventStore::Event/) + end + + it 'refuses the subscription if the subscriber is not a worker' do + expect do + described_class.new do |s| + s.subscribe double, to: event_klass + end + end.to raise_error( + Gitlab::EventStore::Error, + /Subscriber is not an ApplicationWorker/) + end + end + + describe '#publish' do + let(:data) { { name: 'Bob', id: 123 } } + + context 'when event has a subscribed worker' do + let(:store) do + described_class.new do |store| + store.subscribe worker, to: event_klass + store.subscribe another_worker, to: another_event_klass + end + end + + it 'dispatches the event to the subscribed worker' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).not_to receive(:perform_async) + + store.publish(event) + end + + context 'when other workers subscribe to the same event' do + let(:store) do + described_class.new do |store| + store.subscribe worker, to: event_klass + store.subscribe another_worker, to: event_klass + store.subscribe unrelated_worker, to: another_event_klass + end + end + + it 'dispatches the event to each subscribed worker' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).to receive(:perform_async).with('TestEvent', data) + expect(unrelated_worker).not_to receive(:perform_async) + + store.publish(event) + end + end + + context 'when an error is raised' do + before do + allow(worker).to receive(:perform_async).and_raise(NoMethodError, 'the error message') + end + + it 'is rescued and tracked' do + expect(Gitlab::ErrorTracking) + .to receive(:track_and_raise_for_dev_exception) + .with(kind_of(NoMethodError), event_class: event.class.name, event_data: event.data) + .and_call_original + + expect { store.publish(event) }.to raise_error(NoMethodError, 'the error message') + end + end + + it 'raises and tracks an error when event is published inside a database transaction' do + expect(Gitlab::ErrorTracking) + .to receive(:track_and_raise_for_dev_exception) + .at_least(:once) + .and_call_original + + expect do + ApplicationRecord.transaction do + store.publish(event) + end + end.to raise_error(Sidekiq::Worker::EnqueueFromTransactionError) + end + + it 'refuses publishing if the target is not an Event object' do + expect { store.publish(double(:event)) } + .to raise_error( + Gitlab::EventStore::Error, + /Event being published is not an instance of Gitlab::EventStore::Event/) + end + end + + context 'when event has subscribed workers with condition' do + let(:store) do + described_class.new do |s| + s.subscribe worker, to: event_klass, if: -> (event) { event.data[:name] == 'Bob' } + s.subscribe another_worker, to: event_klass, if: -> (event) { event.data[:name] == 'Alice' } + end + end + + let(:event) { event_klass.new(data: data) } + + it 'dispatches the event to the workers satisfying the condition' do + expect(worker).to receive(:perform_async).with('TestEvent', data) + expect(another_worker).not_to receive(:perform_async) + + store.publish(event) + end + end + end + + describe 'subscriber' do + let(:data) { { name: 'Bob', id: 123 } } + let(:event_name) { event.class.name } + let(:worker_instance) { worker.new } + + subject { worker_instance.perform(event_name, data) } + + it 'handles the event' do + expect(worker_instance).to receive(:handle_event).with(instance_of(event.class)) + + expect_any_instance_of(event.class) do |event| + expect(event).to receive(:data).and_return(data) + end + + subject + end + + context 'when the event name does not exist' do + let(:event_name) { 'UnknownClass' } + + it 'raises an error' do + expect { subject }.to raise_error(Gitlab::EventStore::InvalidEvent) + end + end + + context 'when the worker does not define handle_event method' do + let(:worker_instance) { another_worker.new } + + it 'raises an error' do + expect { subject }.to raise_error(NotImplementedError) + end + end + end +end diff --git a/spec/services/ci/create_pipeline_service_spec.rb b/spec/services/ci/create_pipeline_service_spec.rb index ef879d536c3..98ec02d59c6 100644 --- a/spec/services/ci/create_pipeline_service_spec.rb +++ b/spec/services/ci/create_pipeline_service_spec.rb @@ -146,138 +146,44 @@ RSpec.describe Ci::CreatePipelineService do end context 'when merge requests already exist for this source branch' do - let(:merge_request_1) do + let!(:merge_request_1) do create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project) end - let(:merge_request_2) do + let!(:merge_request_2) do create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project) end - context 'when related merge request is already merged' do - let!(:merged_merge_request) do - create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged') - end - - it 'does not schedule update head pipeline job' do - expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id) - - execute_service - end - end - context 'when the head pipeline sha equals merge request sha' do it 'updates head pipeline of each merge request', :sidekiq_might_not_need_inline do - merge_request_1 - merge_request_2 - head_pipeline = execute_service(ref: 'feature', after: nil).payload expect(merge_request_1.reload.head_pipeline).to eq(head_pipeline) expect(merge_request_2.reload.head_pipeline).to eq(head_pipeline) end - end - context 'when the head pipeline sha does not equal merge request sha' do - it 'does not update the head piepeline of MRs' do - merge_request_1 - merge_request_2 + # TODO: remove after ci_publish_pipeline_events FF is removed + # https://gitlab.com/gitlab-org/gitlab/-/issues/336752 + it 'does not schedule sync update for the head pipeline of the merge request' do + expect(UpdateHeadPipelineForMergeRequestWorker) + .not_to receive(:perform_async) - allow_any_instance_of(Ci::Pipeline).to receive(:latest?).and_return(true) - - expect { execute_service(after: 'ae73cb07c9eeaf35924a10f713b364d32b2dd34f') }.not_to raise_error - - last_pipeline = Ci::Pipeline.last - - expect(merge_request_1.reload.head_pipeline).not_to eq(last_pipeline) - expect(merge_request_2.reload.head_pipeline).not_to eq(last_pipeline) + execute_service(ref: 'feature', after: nil) end end - context 'when there is no pipeline for source branch' do - it "does not update merge request head pipeline" do - merge_request = create(:merge_request, source_branch: 'feature', - target_branch: "branch_1", - source_project: project) - - head_pipeline = execute_service.payload - - expect(merge_request.reload.head_pipeline).not_to eq(head_pipeline) - end - end - - context 'when merge request target project is different from source project' do - let!(:project) { fork_project(target_project, nil, repository: true) } - let!(:target_project) { create(:project, :repository) } - let!(:user) { create(:user) } - + context 'when feature flag ci_publish_pipeline_events is disabled' do before do - project.add_developer(user) + stub_feature_flags(ci_publish_pipeline_events: false) end - it 'updates head pipeline for merge request', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'feature', - target_branch: "master", - source_project: project, - target_project: target_project) + it 'schedules update for the head pipeline of the merge request' do + expect(UpdateHeadPipelineForMergeRequestWorker) + .to receive(:perform_async).with(merge_request_1.id) + expect(UpdateHeadPipelineForMergeRequestWorker) + .to receive(:perform_async).with(merge_request_2.id) - head_pipeline = execute_service(ref: 'feature', after: nil).payload - - expect(merge_request.reload.head_pipeline).to eq(head_pipeline) - end - end - - context 'when the pipeline is not the latest for the branch' do - it 'does not update merge request head pipeline' do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: "branch_1", - source_project: project) - - allow_any_instance_of(MergeRequest) - .to receive(:find_actual_head_pipeline) { } - - execute_service - - expect(merge_request.reload.head_pipeline).to be_nil - end - end - - context 'when pipeline has errors' do - before do - stub_ci_pipeline_yaml_file('some invalid syntax') - end - - it 'updates merge request head pipeline reference', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: 'feature', - source_project: project) - - head_pipeline = execute_service.payload - - expect(head_pipeline).to be_persisted - expect(head_pipeline.yaml_errors).to be_present - expect(head_pipeline.messages).to be_present - expect(merge_request.reload.head_pipeline).to eq head_pipeline - end - end - - context 'when pipeline has been skipped' do - before do - allow_any_instance_of(Ci::Pipeline) - .to receive(:git_commit_message) - .and_return('some commit [ci skip]') - end - - it 'updates merge request head pipeline', :sidekiq_might_not_need_inline do - merge_request = create(:merge_request, source_branch: 'master', - target_branch: 'feature', - source_project: project) - - head_pipeline = execute_service.payload - - expect(head_pipeline).to be_skipped - expect(head_pipeline).to be_persisted - expect(merge_request.reload.head_pipeline).to eq head_pipeline + execute_service(ref: 'feature', after: nil) end end end @@ -1655,7 +1561,7 @@ RSpec.describe Ci::CreatePipelineService do expect(pipeline.target_sha).to be_nil end - it 'schedules update for the head pipeline of the merge request' do + it 'schedules update for the head pipeline of the merge request', :sidekiq_inline do expect(UpdateHeadPipelineForMergeRequestWorker) .to receive(:perform_async).with(merge_request.id) diff --git a/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb b/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb new file mode 100644 index 00000000000..f3ea14ad539 --- /dev/null +++ b/spec/workers/merge_requests/update_head_pipeline_worker_spec.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe MergeRequests::UpdateHeadPipelineWorker do + include ProjectForksHelper + + let_it_be(:project) { create(:project, :repository) } + + let(:ref) { 'master' } + let(:pipeline) { create(:ci_pipeline, project: project, ref: ref) } + let(:event) { Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id }) } + + subject { consume_event(event) } + + def consume_event(event) + described_class.new.perform(event.class.name, event.data) + end + + context 'when merge requests already exist for this source branch', :sidekiq_inline do + let(:merge_request_1) do + create(:merge_request, source_branch: 'feature', target_branch: "master", source_project: project) + end + + let(:merge_request_2) do + create(:merge_request, source_branch: 'feature', target_branch: "v1.1.0", source_project: project) + end + + context 'when related merge request is already merged' do + let!(:merged_merge_request) do + create(:merge_request, source_branch: 'master', target_branch: "branch_2", source_project: project, state: 'merged') + end + + it 'does not schedule update head pipeline job' do + expect(UpdateHeadPipelineForMergeRequestWorker).not_to receive(:perform_async).with(merged_merge_request.id) + + subject + end + end + + context 'when the head pipeline sha equals merge request sha' do + let(:ref) { 'feature' } + + before do + pipeline.update!(sha: project.repository.commit(ref).id) + end + + it 'updates head pipeline of each merge request' do + merge_request_1 + merge_request_2 + + subject + + expect(merge_request_1.reload.head_pipeline).to eq(pipeline) + expect(merge_request_2.reload.head_pipeline).to eq(pipeline) + end + end + + context 'when the head pipeline sha does not equal merge request sha' do + let(:ref) { 'feature' } + + it 'does not update the head piepeline of MRs' do + merge_request_1 + merge_request_2 + + subject + + expect(merge_request_1.reload.head_pipeline).not_to eq(pipeline) + expect(merge_request_2.reload.head_pipeline).not_to eq(pipeline) + end + end + + context 'when there is no pipeline for source branch' do + it "does not update merge request head pipeline" do + merge_request = create(:merge_request, source_branch: 'feature', + target_branch: "branch_1", + source_project: project) + + subject + + expect(merge_request.reload.head_pipeline).not_to eq(pipeline) + end + end + + context 'when merge request target project is different from source project' do + let(:project) { fork_project(target_project, nil, repository: true) } + let(:target_project) { create(:project, :repository) } + let(:user) { create(:user) } + let(:ref) { 'feature' } + + before do + project.add_developer(user) + pipeline.update!(sha: project.repository.commit(ref).id) + end + + it 'updates head pipeline for merge request' do + merge_request = create(:merge_request, source_branch: 'feature', + target_branch: "master", + source_project: project, + target_project: target_project) + + subject + + expect(merge_request.reload.head_pipeline).to eq(pipeline) + end + end + + context 'when the pipeline is not the latest for the branch' do + it 'does not update merge request head pipeline' do + merge_request = create(:merge_request, source_branch: 'master', + target_branch: "branch_1", + source_project: project) + + create(:ci_pipeline, project: pipeline.project, ref: pipeline.ref) + + subject + + expect(merge_request.reload.head_pipeline).to be_nil + end + end + + context 'when pipeline has errors' do + before do + pipeline.update!(yaml_errors: 'some errors', status: :failed) + end + + it 'updates merge request head pipeline reference' do + merge_request = create(:merge_request, source_branch: 'master', + target_branch: 'feature', + source_project: project) + + subject + + expect(merge_request.reload.head_pipeline).to eq(pipeline) + end + end + end +end