From b5042e5301e86ec7822221ee29679b0fbf5c71ca Mon Sep 17 00:00:00 2001 From: Sean McGivern Date: Fri, 20 Apr 2018 19:37:38 +0200 Subject: [PATCH] Move NotificationService calls to Sidekiq The NotificationService has to do quite a lot of work to calculate the recipients for an email. Where possible, we should try to avoid doing this in an HTTP request, because the mail are sent by Sidekiq anyway, so there's no need to schedule those emails immediately. This commit creates a generic Sidekiq worker that uses Global ID to serialise and deserialise its arguments, then forwards them to the NotificationService. The NotificationService gains an `#async` method, so you can replace: notification_service.new_issue(issue, current_user) With: notification_service.async.new_issue(issue, current_user) And have everything else work as normal, except that calculating the recipients will be done by Sidekiq, which will then schedule further Sidekiq jobs to send each email. --- app/services/issues/close_service.rb | 2 +- app/services/issues/move_service.rb | 2 +- app/services/issues/reopen_service.rb | 2 +- app/services/issues/update_service.rb | 6 +- app/services/merge_requests/close_service.rb | 2 +- app/services/merge_requests/reopen_service.rb | 2 +- ...esolved_discussion_notification_service.rb | 2 +- app/services/merge_requests/update_service.rb | 11 ++- app/services/notification_service.rb | 75 +++++++++++-------- app/workers/all_queues.yml | 1 + app/workers/concerns/mail_scheduler_queue.rb | 4 + .../mail_scheduler/issue_due_worker.rb | 2 - .../notification_service_worker.rb | 19 +++++ ...-notification-service-calls-to-sidekiq.yml | 5 ++ spec/services/notification_service_spec.rb | 42 ++++++++++- .../mail_scheduler/issue_due_worker_spec.rb | 4 +- .../notification_service_worker_spec.rb | 44 +++++++++++ 17 files changed, 173 insertions(+), 52 deletions(-) create mode 100644 app/workers/mail_scheduler/notification_service_worker.rb create mode 100644 changelogs/unreleased/move-notification-service-calls-to-sidekiq.yml create mode 100644 spec/workers/mail_scheduler/notification_service_worker_spec.rb diff --git a/app/services/issues/close_service.rb b/app/services/issues/close_service.rb index fee5bc38f7b..4a99367c575 100644 --- a/app/services/issues/close_service.rb +++ b/app/services/issues/close_service.rb @@ -26,7 +26,7 @@ module Issues issue.update(closed_by: current_user) event_service.close_issue(issue, current_user) create_note(issue, commit) if system_note - notification_service.close_issue(issue, current_user) if notifications + notification_service.async.close_issue(issue, current_user) if notifications todo_service.close_issue(issue, current_user) execute_hooks(issue, 'close') invalidate_cache_counts(issue, users: issue.assignees) diff --git a/app/services/issues/move_service.rb b/app/services/issues/move_service.rb index 7140890d201..78e79344c99 100644 --- a/app/services/issues/move_service.rb +++ b/app/services/issues/move_service.rb @@ -139,7 +139,7 @@ module Issues end def notify_participants - notification_service.issue_moved(@old_issue, @new_issue, @current_user) + notification_service.async.issue_moved(@old_issue, @new_issue, @current_user) end end end diff --git a/app/services/issues/reopen_service.rb b/app/services/issues/reopen_service.rb index 62b4b4b6a1e..02224f3357a 100644 --- a/app/services/issues/reopen_service.rb +++ b/app/services/issues/reopen_service.rb @@ -6,7 +6,7 @@ module Issues if issue.reopen event_service.reopen_issue(issue, current_user) create_note(issue, 'reopened') - notification_service.reopen_issue(issue, current_user) + notification_service.async.reopen_issue(issue, current_user) execute_hooks(issue, 'reopen') invalidate_cache_counts(issue, users: issue.assignees) issue.update_project_counter_caches diff --git a/app/services/issues/update_service.rb b/app/services/issues/update_service.rb index 1374f10c586..1000e1842b6 100644 --- a/app/services/issues/update_service.rb +++ b/app/services/issues/update_service.rb @@ -30,7 +30,7 @@ module Issues if issue.assignees != old_assignees create_assignee_note(issue, old_assignees) - notification_service.reassigned_issue(issue, current_user, old_assignees) + notification_service.async.reassigned_issue(issue, current_user, old_assignees) todo_service.reassigned_issue(issue, current_user, old_assignees) end @@ -41,13 +41,13 @@ module Issues added_labels = issue.labels - old_labels if added_labels.present? - notification_service.relabeled_issue(issue, added_labels, current_user) + notification_service.async.relabeled_issue(issue, added_labels, current_user) end added_mentions = issue.mentioned_users - old_mentioned_users if added_mentions.present? - notification_service.new_mentions_in_issue(issue, added_mentions, current_user) + notification_service.async.new_mentions_in_issue(issue, added_mentions, current_user) end end diff --git a/app/services/merge_requests/close_service.rb b/app/services/merge_requests/close_service.rb index f727ec002e7..db701c1145d 100644 --- a/app/services/merge_requests/close_service.rb +++ b/app/services/merge_requests/close_service.rb @@ -10,7 +10,7 @@ module MergeRequests if merge_request.close create_event(merge_request) create_note(merge_request) - notification_service.close_mr(merge_request, current_user) + notification_service.async.close_mr(merge_request, current_user) todo_service.close_merge_request(merge_request, current_user) execute_hooks(merge_request, 'close') invalidate_cache_counts(merge_request, users: merge_request.assignees) diff --git a/app/services/merge_requests/reopen_service.rb b/app/services/merge_requests/reopen_service.rb index 120677a7149..8f1c95ac1b7 100644 --- a/app/services/merge_requests/reopen_service.rb +++ b/app/services/merge_requests/reopen_service.rb @@ -6,7 +6,7 @@ module MergeRequests if merge_request.reopen create_event(merge_request) create_note(merge_request, 'reopened') - notification_service.reopen_mr(merge_request, current_user) + notification_service.async.reopen_mr(merge_request, current_user) execute_hooks(merge_request, 'reopen') merge_request.reload_diff(current_user) merge_request.mark_as_unchecked diff --git a/app/services/merge_requests/resolved_discussion_notification_service.rb b/app/services/merge_requests/resolved_discussion_notification_service.rb index 3a09350c847..66a0cbc81d4 100644 --- a/app/services/merge_requests/resolved_discussion_notification_service.rb +++ b/app/services/merge_requests/resolved_discussion_notification_service.rb @@ -4,7 +4,7 @@ module MergeRequests return unless merge_request.discussions_resolved? SystemNoteService.resolve_all_discussions(merge_request, project, current_user) - notification_service.resolve_all_discussions(merge_request, current_user) + notification_service.async.resolve_all_discussions(merge_request, current_user) end end end diff --git a/app/services/merge_requests/update_service.rb b/app/services/merge_requests/update_service.rb index 8a40ad88182..7350725e223 100644 --- a/app/services/merge_requests/update_service.rb +++ b/app/services/merge_requests/update_service.rb @@ -21,6 +21,7 @@ module MergeRequests update(merge_request) end + # rubocop:disable Metrics/AbcSize def handle_changes(merge_request, options) old_associations = options.fetch(:old_associations, {}) old_labels = old_associations.fetch(:labels, []) @@ -42,8 +43,11 @@ module MergeRequests end if merge_request.previous_changes.include?('assignee_id') + old_assignee_id = merge_request.previous_changes['assignee_id'].first + old_assignee = User.find(old_assignee_id) if old_assignee_id + create_assignee_note(merge_request) - notification_service.reassigned_merge_request(merge_request, current_user) + notification_service.async.reassigned_merge_request(merge_request, current_user, old_assignee) todo_service.reassigned_merge_request(merge_request, current_user) end @@ -54,7 +58,7 @@ module MergeRequests added_labels = merge_request.labels - old_labels if added_labels.present? - notification_service.relabeled_merge_request( + notification_service.async.relabeled_merge_request( merge_request, added_labels, current_user @@ -63,13 +67,14 @@ module MergeRequests added_mentions = merge_request.mentioned_users - old_mentioned_users if added_mentions.present? - notification_service.new_mentions_in_merge_request( + notification_service.async.new_mentions_in_merge_request( merge_request, added_mentions, current_user ) end end + # rubocop:enable Metrics/AbcSize def merge_from_quick_action(merge_request) last_diff_sha = params.delete(:merge) diff --git a/app/services/notification_service.rb b/app/services/notification_service.rb index 274161df946..55a1735e54b 100644 --- a/app/services/notification_service.rb +++ b/app/services/notification_service.rb @@ -7,7 +7,32 @@ # Ex. # NotificationService.new.new_issue(issue, current_user) # +# When calculating the recipients of a notification is expensive (for instance, +# in the new issue case), `#async` will make that calculation happen in Sidekiq +# instead: +# +# NotificationService.new.async.new_issue(issue, current_user) +# class NotificationService + class Async + attr_reader :parent + delegate :respond_to_missing, to: :parent + + def initialize(parent) + @parent = parent + end + + def method_missing(meth, *args) + return super unless parent.respond_to?(meth) + + MailScheduler::NotificationServiceWorker.perform_async(meth.to_s, *args) + end + end + + def async + @async ||= Async.new(self) + end + # Always notify user about ssh key added # only if ssh key is not deploy key # @@ -142,8 +167,23 @@ class NotificationService # * merge_request assignee if their notification level is not Disabled # * users with custom level checked with "reassign merge request" # - def reassigned_merge_request(merge_request, current_user) - reassign_resource_email(merge_request, current_user, :reassigned_merge_request_email) + def reassigned_merge_request(merge_request, current_user, previous_assignee) + recipients = NotificationRecipientService.build_recipients( + merge_request, + current_user, + action: "reassign", + previous_assignee: previous_assignee + ) + + recipients.each do |recipient| + mailer.reassigned_merge_request_email( + recipient.user.id, + merge_request.id, + previous_assignee&.id, + current_user.id, + recipient.reason + ).deliver_later + end end # When we add labels to a merge request we should send an email to: @@ -421,29 +461,6 @@ class NotificationService end end - def reassign_resource_email(target, current_user, method) - previous_assignee_id = previous_record(target, 'assignee_id') - previous_assignee = User.find_by(id: previous_assignee_id) if previous_assignee_id - - recipients = NotificationRecipientService.build_recipients( - target, - current_user, - action: "reassign", - previous_assignee: previous_assignee - ) - - recipients.each do |recipient| - mailer.send( - method, - recipient.user.id, - target.id, - previous_assignee_id, - current_user.id, - recipient.reason - ).deliver_later - end - end - def relabeled_resource_email(target, labels, current_user, method) recipients = labels.flat_map { |l| l.subscribers(target.project) }.uniq recipients = notifiable_users( @@ -471,14 +488,6 @@ class NotificationService Notify end - def previous_record(object, attribute) - return unless object && attribute - - if object.previous_changes.include?(attribute) - object.previous_changes[attribute].first - end - end - private def recipients_for_pages_domain(domain) diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 9aea3bad27b..c469aea7052 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -41,6 +41,7 @@ - github_importer:github_import_stage_import_repository - mail_scheduler:mail_scheduler_issue_due +- mail_scheduler:mail_scheduler_notification_service - object_storage_upload - object_storage:object_storage_background_move diff --git a/app/workers/concerns/mail_scheduler_queue.rb b/app/workers/concerns/mail_scheduler_queue.rb index 9df55ad9522..f3e9680d756 100644 --- a/app/workers/concerns/mail_scheduler_queue.rb +++ b/app/workers/concerns/mail_scheduler_queue.rb @@ -4,4 +4,8 @@ module MailSchedulerQueue included do queue_namespace :mail_scheduler end + + def notification_service + @notification_service ||= NotificationService.new + end end diff --git a/app/workers/mail_scheduler/issue_due_worker.rb b/app/workers/mail_scheduler/issue_due_worker.rb index b06079d68ca..54285884a52 100644 --- a/app/workers/mail_scheduler/issue_due_worker.rb +++ b/app/workers/mail_scheduler/issue_due_worker.rb @@ -4,8 +4,6 @@ module MailScheduler include MailSchedulerQueue def perform(project_id) - notification_service = NotificationService.new - Issue.opened.due_tomorrow.in_projects(project_id).preload(:project).find_each do |issue| notification_service.issue_due(issue) end diff --git a/app/workers/mail_scheduler/notification_service_worker.rb b/app/workers/mail_scheduler/notification_service_worker.rb new file mode 100644 index 00000000000..7cfe0aa0df1 --- /dev/null +++ b/app/workers/mail_scheduler/notification_service_worker.rb @@ -0,0 +1,19 @@ +require 'active_job/arguments' + +module MailScheduler + class NotificationServiceWorker + include ApplicationWorker + include MailSchedulerQueue + + def perform(meth, *args) + deserialized_args = ActiveJob::Arguments.deserialize(args) + + notification_service.public_send(meth, *deserialized_args) # rubocop:disable GitlabSecurity/PublicSend + rescue ActiveJob::DeserializationError + end + + def self.perform_async(*args) + super(*ActiveJob::Arguments.serialize(args)) + end + end +end diff --git a/changelogs/unreleased/move-notification-service-calls-to-sidekiq.yml b/changelogs/unreleased/move-notification-service-calls-to-sidekiq.yml new file mode 100644 index 00000000000..b2517884d3c --- /dev/null +++ b/changelogs/unreleased/move-notification-service-calls-to-sidekiq.yml @@ -0,0 +1,5 @@ +--- +title: Compute notification recipients in background jobs +merge_request: +author: +type: performance diff --git a/spec/services/notification_service_spec.rb b/spec/services/notification_service_spec.rb index 55bbe954491..48ef5f3c115 100644 --- a/spec/services/notification_service_spec.rb +++ b/spec/services/notification_service_spec.rb @@ -96,6 +96,37 @@ describe NotificationService, :mailer do it_should_behave_like 'participating by assignee notification' end + describe '#async' do + let(:async) { notification.async } + set(:key) { create(:personal_key) } + + it 'returns an Async object with the correct parent' do + expect(async).to be_a(described_class::Async) + expect(async.parent).to eq(notification) + end + + context 'when receiving a public method' do + it 'schedules a MailScheduler::NotificationServiceWorker' do + expect(MailScheduler::NotificationServiceWorker) + .to receive(:perform_async).with('new_key', key) + + async.new_key(key) + end + end + + context 'when receiving a private method' do + it 'raises NoMethodError' do + expect { async.notifiable?(key) }.to raise_error(NoMethodError) + end + end + + context 'when recieving a non-existent method' do + it 'raises NoMethodError' do + expect { async.foo(key) }.to raise_error(NoMethodError) + end + end + end + describe 'Keys' do describe '#new_key' do let(:key_options) { {} } @@ -982,6 +1013,8 @@ describe NotificationService, :mailer do let(:merge_request) { create :merge_request, source_project: project, assignee: create(:user), description: 'cc @participant' } before do + project.add_master(merge_request.author) + project.add_master(merge_request.assignee) build_team(merge_request.target_project) add_users_with_subscription(merge_request.target_project, merge_request) update_custom_notification(:new_merge_request, @u_guest_custom, resource: project) @@ -1093,15 +1126,18 @@ describe NotificationService, :mailer do end describe '#reassigned_merge_request' do + let(:current_user) { create(:user) } + before do update_custom_notification(:reassign_merge_request, @u_guest_custom, resource: project) update_custom_notification(:reassign_merge_request, @u_custom_global) end it do - notification.reassigned_merge_request(merge_request, merge_request.author) + notification.reassigned_merge_request(merge_request, current_user, merge_request.author) should_email(merge_request.assignee) + should_email(merge_request.author) should_email(@u_watcher) should_email(@u_participant_mentioned) should_email(@subscriber) @@ -1116,7 +1152,7 @@ describe NotificationService, :mailer do end it 'adds "assigned" reason for new assignee' do - notification.reassigned_merge_request(merge_request, merge_request.author) + notification.reassigned_merge_request(merge_request, current_user, merge_request.author) email = find_email_for(merge_request.assignee) @@ -1126,7 +1162,7 @@ describe NotificationService, :mailer do it_behaves_like 'participating notifications' do let(:participant) { create(:user, username: 'user-participant') } let(:issuable) { merge_request } - let(:notification_trigger) { notification.reassigned_merge_request(merge_request, @u_disabled) } + let(:notification_trigger) { notification.reassigned_merge_request(merge_request, current_user, merge_request.author) } end end diff --git a/spec/workers/mail_scheduler/issue_due_worker_spec.rb b/spec/workers/mail_scheduler/issue_due_worker_spec.rb index 48ac1b8a1a4..1026ae5b4bf 100644 --- a/spec/workers/mail_scheduler/issue_due_worker_spec.rb +++ b/spec/workers/mail_scheduler/issue_due_worker_spec.rb @@ -12,8 +12,8 @@ describe MailScheduler::IssueDueWorker do create(:issue, :opened, project: project, due_date: 2.days.from_now) # due on another day create(:issue, :opened, due_date: Date.tomorrow) # different project - expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue1) - expect_any_instance_of(NotificationService).to receive(:issue_due).with(issue2) + expect(worker.notification_service).to receive(:issue_due).with(issue1) + expect(worker.notification_service).to receive(:issue_due).with(issue2) worker.perform(project.id) end diff --git a/spec/workers/mail_scheduler/notification_service_worker_spec.rb b/spec/workers/mail_scheduler/notification_service_worker_spec.rb new file mode 100644 index 00000000000..f725c8763a0 --- /dev/null +++ b/spec/workers/mail_scheduler/notification_service_worker_spec.rb @@ -0,0 +1,44 @@ +require 'spec_helper' + +describe MailScheduler::NotificationServiceWorker do + let(:worker) { described_class.new } + let(:method) { 'new_key' } + set(:key) { create(:personal_key) } + + def serialize(*args) + ActiveJob::Arguments.serialize(args) + end + + describe '#perform' do + it 'deserializes arguments from global IDs' do + expect(worker.notification_service).to receive(method).with(key) + + worker.perform(method, *serialize(key)) + end + + context 'when the arguments cannot be deserialized' do + it 'does nothing' do + expect(worker.notification_service).not_to receive(method) + + worker.perform(method, key.to_global_id.to_s.succ) + end + end + + context 'when the method is not a public method' do + it 'raises NoMethodError' do + expect { worker.perform('notifiable?', *serialize(key)) }.to raise_error(NoMethodError) + end + end + end + + describe '.perform_async' do + it 'serializes arguments as global IDs when scheduling' do + Sidekiq::Testing.fake! do + described_class.perform_async(method, key) + + expect(described_class.jobs.count).to eq(1) + expect(described_class.jobs.first).to include('args' => [method, *serialize(key)]) + end + end + end +end