diff --git a/server.ts b/server.ts index 6dff16f46..868a03ba4 100644 --- a/server.ts +++ b/server.ts @@ -94,7 +94,7 @@ import { } from './server/controllers' import { advertiseDoNotTrack } from './server/middlewares/dnt' import { Redis } from './server/lib/redis' -import { BadActorFollowScheduler } from './server/lib/schedulers/bad-actor-follow-scheduler' +import { ActorFollowScheduler } from './server/lib/schedulers/actor-follow-scheduler' import { RemoveOldJobsScheduler } from './server/lib/schedulers/remove-old-jobs-scheduler' import { UpdateVideosScheduler } from './server/lib/schedulers/update-videos-scheduler' import { YoutubeDlUpdateScheduler } from './server/lib/schedulers/youtube-dl-update-scheduler' @@ -219,7 +219,7 @@ async function startApplication () { VideosCaptionCache.Instance.init(CONFIG.CACHE.VIDEO_CAPTIONS.SIZE, CACHE.VIDEO_CAPTIONS.MAX_AGE) // Enable Schedulers - BadActorFollowScheduler.Instance.enable() + ActorFollowScheduler.Instance.enable() RemoveOldJobsScheduler.Instance.enable() UpdateVideosScheduler.Instance.enable() YoutubeDlUpdateScheduler.Instance.enable() diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index b326a6c7b..1c27a9f6b 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -144,7 +144,7 @@ const VIDEO_IMPORT_TIMEOUT = 1000 * 3600 // 1 hour // 1 hour let SCHEDULER_INTERVALS_MS = { - badActorFollow: 60000 * 60, // 1 hour + actorFollowScores: 60000 * 60, // 1 hour removeOldJobs: 60000 * 60, // 1 hour updateVideos: 60000, // 1 minute youtubeDLUpdate: 60000 * 60 * 24 // 1 day @@ -675,7 +675,7 @@ if (isTestInstance() === true) { CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB - SCHEDULER_INTERVALS_MS.badActorFollow = 10000 + SCHEDULER_INTERVALS_MS.actorFollowScores = 1000 SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 SCHEDULER_INTERVALS_MS.updateVideos = 5000 REPEAT_JOBS['videos-views'] = { every: 5000 } diff --git a/server/lib/cache/actor-follow-score-cache.ts b/server/lib/cache/actor-follow-score-cache.ts new file mode 100644 index 000000000..d070bde09 --- /dev/null +++ b/server/lib/cache/actor-follow-score-cache.ts @@ -0,0 +1,46 @@ +import { ACTOR_FOLLOW_SCORE } from '../../initializers' +import { logger } from '../../helpers/logger' + +// Cache follows scores, instead of writing them too often in database +// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores +class ActorFollowScoreCache { + + private static instance: ActorFollowScoreCache + private pendingFollowsScore: { [ url: string ]: number } = {} + + private constructor () {} + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { + if (goodInboxes.length === 0 && badInboxes.length === 0) return + + logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length) + + for (const goodInbox of goodInboxes) { + if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0 + + this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS + } + + for (const badInbox of badInboxes) { + if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 + + this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY + } + } + + getPendingFollowsScoreCopy () { + return this.pendingFollowsScore + } + + clearPendingFollowsScore () { + this.pendingFollowsScore = {} + } +} + +export { + ActorFollowScoreCache +} diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts index 54eb983fa..e921d04a7 100644 --- a/server/lib/cache/index.ts +++ b/server/lib/cache/index.ts @@ -1,2 +1,3 @@ +export * from './actor-follow-score-cache' export * from './videos-preview-cache' export * from './videos-caption-cache' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index abbd89b3b..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpBroadcastPayload = { uris: string[] @@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { .catch(() => badUrls.push(uri)) }, { concurrency: BROADCAST_CONCURRENCY }) - return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) + return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index d36479032..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,9 +1,9 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpUnicastPayload = { uri: string @@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { try { await doRequest(options) - ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) } catch (err) { - ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) throw err } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e34be7dcd..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -165,10 +165,10 @@ class JobQueue { return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts index b9d0a4d17..86ea7aa38 100644 --- a/server/lib/schedulers/abstract-scheduler.ts +++ b/server/lib/schedulers/abstract-scheduler.ts @@ -1,8 +1,11 @@ +import { logger } from '../../helpers/logger' + export abstract class AbstractScheduler { protected abstract schedulerIntervalMs: number private interval: NodeJS.Timer + private isRunning = false enable () { if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') @@ -14,5 +17,18 @@ export abstract class AbstractScheduler { clearInterval(this.interval) } - abstract execute () + async execute () { + if (this.isRunning === true) return + this.isRunning = true + + try { + await this.internalExecute() + } catch (err) { + logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) + } finally { + this.isRunning = false + } + } + + protected abstract internalExecute (): Promise } diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts similarity index 51% rename from server/lib/schedulers/bad-actor-follow-scheduler.ts rename to server/lib/schedulers/actor-follow-scheduler.ts index 617149aaf..3967be7f8 100644 --- a/server/lib/schedulers/bad-actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts @@ -3,18 +3,35 @@ import { logger } from '../../helpers/logger' import { ActorFollowModel } from '../../models/activitypub/actor-follow' import { AbstractScheduler } from './abstract-scheduler' import { SCHEDULER_INTERVALS_MS } from '../../initializers' +import { ActorFollowScoreCache } from '../cache' -export class BadActorFollowScheduler extends AbstractScheduler { +export class ActorFollowScheduler extends AbstractScheduler { private static instance: AbstractScheduler - protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores private constructor () { super() } - async execute () { + protected async internalExecute () { + await this.processPendingScores() + + await this.removeBadActorFollows() + } + + private async processPendingScores () { + const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() + + ActorFollowScoreCache.Instance.clearPendingFollowsScore() + + for (const inbox of Object.keys(pendingScores)) { + await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) + } + } + + private async removeBadActorFollows () { if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') try { diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts index a29a6b800..4a4341ba9 100644 --- a/server/lib/schedulers/remove-old-jobs-scheduler.ts +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts @@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler { super() } - async execute () { - if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') + protected internalExecute () { + if (!isTestInstance()) logger.info('Removing old jobs in scheduler.') - JobQueue.Instance.removeOldJobs() + return JobQueue.Instance.removeOldJobs() } static get Instance () { diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts index fd2edfd17..21f071f9e 100644 --- a/server/lib/schedulers/update-videos-scheduler.ts +++ b/server/lib/schedulers/update-videos-scheduler.ts @@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler { protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos - private isRunning = false - private constructor () { super() } - async execute () { - if (this.isRunning === true) return - this.isRunning = true - - try { - await retryTransactionWrapper(this.updateVideos.bind(this)) - } catch (err) { - logger.error('Cannot execute update videos scheduler.', { err }) - } finally { - this.isRunning = false - } + protected async internalExecute () { + return retryTransactionWrapper(this.updateVideos.bind(this)) } private async updateVideos () { diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 15e094d39..f643ee226 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts @@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' export class VideosRedundancyScheduler extends AbstractScheduler { private static instance: AbstractScheduler - private executing = false protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL @@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { super() } - async execute () { - if (this.executing) return - - this.executing = true - + protected async internalExecute () { for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) @@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler { await this.extendsLocalExpiration() await this.purgeRemoteExpired() - - this.executing = false } static get Instance () { diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts index 461cd045e..aa027116d 100644 --- a/server/lib/schedulers/youtube-dl-update-scheduler.ts +++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts @@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler { super() } - execute () { + protected internalExecute () { return updateYoutubeDLBinary() } diff --git a/server/models/activitypub/actor-follow.ts b/server/models/activitypub/actor-follow.ts index 0a6935083..994f791de 100644 --- a/server/models/activitypub/actor-follow.ts +++ b/server/models/activitypub/actor-follow.ts @@ -127,22 +127,6 @@ export class ActorFollowModel extends Model { if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved) } - static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) { - if (goodInboxes.length === 0 && badInboxes.length === 0) return - - logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length) - - if (goodInboxes.length !== 0) { - ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t) - .catch(err => logger.error('Cannot increment scores of good actor follows.', { err })) - } - - if (badInboxes.length !== 0) { - ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t) - .catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err })) - } - } - static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) { const query = { where: { @@ -464,6 +448,22 @@ export class ActorFollowModel extends Model { } } + static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) { + const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + + 'WHERE id IN (' + + 'SELECT "actorFollow"."id" FROM "actorFollow" ' + + 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + + `WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` + + ')' + + const options = { + type: Sequelize.QueryTypes.BULKUPDATE, + transaction: t + } + + return ActorFollowModel.sequelize.query(query, options) + } + private static async createListAcceptedFollowForApiQuery ( type: 'followers' | 'following', actorIds: number[], @@ -518,24 +518,6 @@ export class ActorFollowModel extends Model { } } - private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) { - const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',') - - const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + - 'WHERE id IN (' + - 'SELECT "actorFollow"."id" FROM "actorFollow" ' + - 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + - 'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' + - ')' - - const options = t ? { - type: Sequelize.QueryTypes.BULKUPDATE, - transaction: t - } : undefined - - return ActorFollowModel.sequelize.query(query, options) - } - private static listBadActorFollows () { const query = { where: {