diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts index f7e10fd04..42f503be6 100644 --- a/client/src/app/+admin/system/jobs/jobs.component.ts +++ b/client/src/app/+admin/system/jobs/jobs.component.ts @@ -25,6 +25,7 @@ export class JobsComponent extends RestTable implements OnInit { 'activitypub-follow', 'activitypub-http-broadcast', + 'activitypub-http-broadcast-parallel', 'activitypub-http-fetcher', 'activitypub-http-unicast', 'activitypub-refresher', diff --git a/scripts/simulate-many-viewers.ts b/scripts/simulate-many-viewers.ts new file mode 100644 index 000000000..fb666c318 --- /dev/null +++ b/scripts/simulate-many-viewers.ts @@ -0,0 +1,89 @@ +import Bluebird from 'bluebird' +import { wait } from '@shared/core-utils' +import { + createSingleServer, + doubleFollow, + killallServers, + PeerTubeServer, + setAccessTokensToServers, + waitJobs +} from '@shared/server-commands' + +let servers: PeerTubeServer[] +const viewers: { xForwardedFor: string }[] = [] +let videoId: string + +run() + .then(() => process.exit(0)) + .catch(err => console.error(err)) + .finally(() => killallServers(servers)) + +async function run () { + await prepare() + + while (true) { + await runViewers() + } +} + +async function prepare () { + console.log('Preparing servers...') + + const config = { + log: { + level: 'info' + }, + rates_limit: { + api: { + max: 5_000_000 + } + }, + views: { + videos: { + local_buffer_update_interval: '30 minutes', + ip_view_expiration: '1 hour' + } + } + } + + servers = await Promise.all([ + createSingleServer(1, config, { nodeArgs: [ '--inspect' ] }), + createSingleServer(2, config), + createSingleServer(3, config) + ]) + + await setAccessTokensToServers(servers) + await doubleFollow(servers[0], servers[1]) + await doubleFollow(servers[0], servers[2]) + + const { uuid } = await servers[0].videos.quickUpload({ name: 'video' }) + videoId = uuid + + await waitJobs(servers) + + const THOUSAND_VIEWERS = 2 + + for (let i = 2; i < 252; i++) { + for (let j = 2; j < 6; j++) { + for (let k = 2; k < THOUSAND_VIEWERS + 2; k++) { + viewers.push({ xForwardedFor: `0.${k}.${j}.${i},127.0.0.1` }) + } + } + } + + console.log('Servers preparation finished.') +} + +async function runViewers () { + console.log('Will run views of %d viewers.', viewers.length) + + const before = new Date().getTime() + + await Bluebird.map(viewers, viewer => { + return servers[0].views.simulateView({ id: videoId, xForwardedFor: viewer.xForwardedFor }) + }, { concurrency: 100 }) + + console.log('Finished to run views in %d seconds.', (new Date().getTime() - before) / 1000) + + await wait(5000) +} diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 2d324d1eb..75ccbc458 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -139,6 +139,7 @@ const REMOTE_SCHEME = { const JOB_ATTEMPTS: { [id in JobType]: number } = { 'activitypub-http-broadcast': 1, + 'activitypub-http-broadcast-parallel': 1, 'activitypub-http-unicast': 1, 'activitypub-http-fetcher': 2, 'activitypub-follow': 5, @@ -159,6 +160,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { // Excluded keys are jobs that can be configured by admins const JOB_CONCURRENCY: { [id in Exclude]: number } = { 'activitypub-http-broadcast': 1, + 'activitypub-http-broadcast-parallel': 30, 'activitypub-http-unicast': 10, 'activitypub-http-fetcher': 3, 'activitypub-cleaner': 1, @@ -176,6 +178,7 @@ const JOB_CONCURRENCY: { [id in Exclude ({ start: s.startTimestamp, end: s.endTimestamp - })) + })), + + transaction: t }) } diff --git a/server/lib/activitypub/send/send-view.ts b/server/lib/activitypub/send/send-view.ts index 25a20ec6d..bf3451603 100644 --- a/server/lib/activitypub/send/send-view.ts +++ b/server/lib/activitypub/send/send-view.ts @@ -26,7 +26,7 @@ async function sendView (options: { return buildViewActivity({ url, byActor, video, audience, type }) } - return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' }) + return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true }) } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/send/shared/audience-utils.ts b/server/lib/activitypub/send/shared/audience-utils.ts index ba4be487c..2f6b0741d 100644 --- a/server/lib/activitypub/send/shared/audience-utils.ts +++ b/server/lib/activitypub/send/shared/audience-utils.ts @@ -3,7 +3,7 @@ import { ACTIVITY_PUB } from '@server/initializers/constants' import { ActorModel } from '@server/models/actor/actor' import { VideoModel } from '@server/models/video/video' import { VideoShareModel } from '@server/models/video/video-share' -import { MActorFollowersUrl, MActorLight, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models' +import { MActorFollowersUrl, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models' import { ActivityAudience } from '@shared/models' function getOriginVideoAudience (accountActor: MActorUrl, actorsInvolvedInVideo: MActorFollowersUrl[] = []): ActivityAudience { @@ -51,13 +51,13 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: MActorFollowersUrl[ } async function getActorsInvolvedInVideo (video: MVideoId, t: Transaction) { - const actors: MActorLight[] = await VideoShareModel.loadActorsByShare(video.id, t) + const actors = await VideoShareModel.listActorIdsAndFollowerUrlsByShare(video.id, t) const videoAll = video as VideoModel const videoActor = videoAll.VideoChannel?.Account ? videoAll.VideoChannel.Account.Actor - : await ActorModel.loadFromAccountByVideoId(video.id, t) + : await ActorModel.loadAccountActorFollowerUrlByVideoId(video.id, t) actors.push(videoActor) diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts index dbcde91ee..fcec63991 100644 --- a/server/lib/activitypub/send/shared/send-utils.ts +++ b/server/lib/activitypub/send/shared/send-utils.ts @@ -15,17 +15,18 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud byActor: MActorLight video: MVideoImmutable | MVideoAccountLight contextType: ContextType + parallelizable?: boolean transaction?: Transaction }) { - const { byActor, video, transaction, contextType } = options - - const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction) + const { byActor, video, transaction, contextType, parallelizable } = options // Send to origin if (video.isOwned() === false) { return sendVideoActivityToOrigin(activityBuilder, options) } + const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction) + // Send to followers const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo) const activity = activityBuilder(audience) @@ -38,6 +39,7 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud toFollowersOf: actorsInvolvedInVideo, transaction, actorsException, + parallelizable, contextType }) } @@ -130,9 +132,10 @@ async function broadcastToFollowers (options: { transaction: Transaction contextType: ContextType + parallelizable?: boolean actorsException?: MActorWithInboxes[] }) { - const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options + const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction) @@ -141,6 +144,7 @@ async function broadcastToFollowers (options: { uris, data, byActor, + parallelizable, contextType }) }) @@ -173,8 +177,9 @@ function broadcastTo (options: { data: any byActor: MActorId contextType: ContextType + parallelizable?: boolean // default to false }) { - const { uris, data, byActor, contextType } = options + const { uris, data, byActor, contextType, parallelizable } = options if (uris.length === 0) return undefined @@ -200,7 +205,13 @@ function broadcastTo (options: { contextType } - JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) + JobQueue.Instance.createJob({ + type: parallelizable + ? 'activitypub-http-broadcast-parallel' + : 'activitypub-http-broadcast', + + payload + }) } for (const unicastUri of unicastUris) { diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 61e41fb0f..ce24763f1 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -43,6 +43,7 @@ import { processVideosViewsStats } from './handlers/video-views-stats' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | + { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | { type: 'activitypub-http-cleaner', payload: {} } | @@ -68,6 +69,7 @@ export type CreateJobOptions = { const handlers: { [id in JobType]: (job: Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, + 'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-cleaner': processActivityPubCleaner, @@ -93,6 +95,7 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } const jobTypes: JobType[] = [ 'activitypub-follow', 'activitypub-http-broadcast', + 'activitypub-http-broadcast-parallel', 'activitypub-http-fetcher', 'activitypub-http-unicast', 'activitypub-cleaner', diff --git a/server/lib/views/shared/video-viewer-counters.ts b/server/lib/views/shared/video-viewer-counters.ts index 941b62ed7..999ab7d8d 100644 --- a/server/lib/views/shared/video-viewer-counters.ts +++ b/server/lib/views/shared/video-viewer-counters.ts @@ -1,4 +1,3 @@ - import { isTestInstance } from '@server/helpers/core-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { VIEW_LIFETIME } from '@server/initializers/constants' diff --git a/server/models/actor/actor.ts b/server/models/actor/actor.ts index 93145b8ae..943b7364f 100644 --- a/server/models/actor/actor.ts +++ b/server/models/actor/actor.ts @@ -1,5 +1,5 @@ import { values } from 'lodash' -import { literal, Op, Transaction } from 'sequelize' +import { literal, Op, QueryTypes, Transaction } from 'sequelize' import { AllowNull, BelongsTo, @@ -43,15 +43,18 @@ import { MActorAccountChannelId, MActorAPAccount, MActorAPChannel, + MActorFollowersUrl, MActorFormattable, MActorFull, MActorHost, + MActorId, MActorServer, MActorSummaryFormattable, MActorUrl, MActorWithInboxes } from '../../types/models' import { AccountModel } from '../account/account' +import { getServerActor } from '../application/application' import { ServerModel } from '../server/server' import { isOutdated, throwIfNotValid } from '../utils' import { VideoModel } from '../video/video' @@ -304,7 +307,10 @@ export class ActorModel extends Model>> { }) VideoChannel: VideoChannelModel - static load (id: number): Promise { + static async load (id: number): Promise { + const actorServer = await getServerActor() + if (id === actorServer.id) return actorServer + return ActorModel.unscoped().findByPk(id) } @@ -312,48 +318,21 @@ export class ActorModel extends Model>> { return ActorModel.scope(ScopeNames.FULL).findByPk(id) } - static loadFromAccountByVideoId (videoId: number, transaction: Transaction): Promise { - const query = { - include: [ - { - attributes: [ 'id' ], - model: AccountModel.unscoped(), - required: true, - include: [ - { - attributes: [ 'id' ], - model: VideoChannelModel.unscoped(), - required: true, - include: [ - { - attributes: [ 'id' ], - model: VideoModel.unscoped(), - required: true, - where: { - id: videoId - } - } - ] - } - ] - } - ], + static loadAccountActorFollowerUrlByVideoId (videoId: number, transaction: Transaction) { + const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` + + `FROM "actor" ` + + `INNER JOIN "account" ON "actor"."id" = "account"."actorId" ` + + `INNER JOIN "videoChannel" ON "videoChannel"."accountId" = "account"."id" ` + + `INNER JOIN "video" ON "video"."channelId" = "videoChannel"."id" AND "video"."id" = :videoId` + + const options = { + type: QueryTypes.SELECT as QueryTypes.SELECT, + replacements: { videoId }, + plain: true as true, transaction } - return ActorModel.unscoped().findOne(query) - } - - static isActorUrlExist (url: string) { - const query = { - raw: true, - where: { - url - } - } - - return ActorModel.unscoped().findOne(query) - .then(a => !!a) + return ActorModel.sequelize.query(query, options) } static listByFollowersUrls (followersUrls: string[], transaction?: Transaction): Promise { diff --git a/server/models/video/video-share.ts b/server/models/video/video-share.ts index ad95dec6e..ca63bb2d9 100644 --- a/server/models/video/video-share.ts +++ b/server/models/video/video-share.ts @@ -3,7 +3,7 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Mode import { AttributesOnly } from '@shared/typescript-utils' import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc' import { CONSTRAINTS_FIELDS } from '../../initializers/constants' -import { MActorDefault } from '../../types/models' +import { MActorDefault, MActorFollowersUrl, MActorId } from '../../types/models' import { MVideoShareActor, MVideoShareFull } from '../../types/models/video' import { ActorModel } from '../actor/actor' import { buildLocalActorIdsIn, throwIfNotValid } from '../utils' @@ -107,22 +107,19 @@ export class VideoShareModel extends Model { - const query = { - where: { - videoId - }, - include: [ - { - model: ActorModel, - required: true - } - ], + static listActorIdsAndFollowerUrlsByShare (videoId: number, t: Transaction) { + const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` + + `FROM "videoShare" ` + + `INNER JOIN "actor" ON "actor"."id" = "videoShare"."actorId" ` + + `WHERE "videoShare"."videoId" = :videoId` + + const options = { + type: QueryTypes.SELECT as QueryTypes.SELECT, + replacements: { videoId }, transaction: t } - return VideoShareModel.scope(ScopeNames.FULL).findAll(query) - .then((res: MVideoShareFull[]) => res.map(r => r.Actor)) + return VideoShareModel.sequelize.query(query, options) } static loadActorsWhoSharedVideosOf (actorOwnerId: number, t: Transaction): Promise { diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 073f15872..4633ab769 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -9,6 +9,7 @@ export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' export type JobType = | 'activitypub-http-unicast' | 'activitypub-http-broadcast' + | 'activitypub-http-broadcast-parallel' | 'activitypub-http-fetcher' | 'activitypub-cleaner' | 'activitypub-follow'