1
0
Fork 0

Update bullmq

Requires redis >= 6.2
This commit is contained in:
Chocobozzz 2023-02-16 14:10:11 +01:00
parent e65ef81cf5
commit 182082f52d
No known key found for this signature in database
GPG key ID: 583A612D890159BE
4 changed files with 13 additions and 47 deletions

View file

@ -107,7 +107,7 @@
"bencode": "^2.0.2", "bencode": "^2.0.2",
"bittorrent-tracker": "^9.0.0", "bittorrent-tracker": "^9.0.0",
"bluebird": "^3.5.0", "bluebird": "^3.5.0",
"bullmq": "^1.87.0", "bullmq": "^3.6.6",
"bytes": "^3.0.0", "bytes": "^3.0.0",
"chokidar": "^3.4.2", "chokidar": "^3.4.2",
"commander": "^10.0.0", "commander": "^10.0.0",

View file

@ -212,10 +212,10 @@ const JOB_TTL: { [id in JobType]: number } = {
} }
const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
'videos-views-stats': { 'videos-views-stats': {
cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour pattern: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
}, },
'activitypub-cleaner': { 'activitypub-cleaner': {
cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM pattern: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
} }
} }
const JOB_PRIORITY = { const JOB_PRIORITY = {

View file

@ -7,11 +7,10 @@ import {
QueueEvents, QueueEvents,
QueueEventsOptions, QueueEventsOptions,
QueueOptions, QueueOptions,
QueueScheduler,
QueueSchedulerOptions,
Worker, Worker,
WorkerOptions WorkerOptions
} from 'bullmq' } from 'bullmq'
import { parseDurationToMs } from '@server/helpers/core-utils'
import { jobStates } from '@server/helpers/custom-validators/jobs' import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config' import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
@ -41,14 +40,7 @@ import {
VideoTranscodingPayload VideoTranscodingPayload
} from '../../../shared/models' } from '../../../shared/models'
import { logger } from '../../helpers/logger' import { logger } from '../../helpers/logger'
import { import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
JOB_ATTEMPTS,
JOB_CONCURRENCY,
JOB_REMOVAL_OPTIONS,
JOB_TTL,
REPEAT_JOBS,
WEBSERVER
} from '../../initializers/constants'
import { Hooks } from '../plugins/hooks' import { Hooks } from '../plugins/hooks'
import { Redis } from '../redis' import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner' import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats' import { processVideosViewsStats } from './handlers/video-views-stats'
import { parseDurationToMs } from '@server/helpers/core-utils'
export type CreateJobArgument = export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@ -166,7 +157,6 @@ class JobQueue {
private workers: { [id in JobType]?: Worker } = {} private workers: { [id in JobType]?: Worker } = {}
private queues: { [id in JobType]?: Queue } = {} private queues: { [id in JobType]?: Queue } = {}
private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {}
private flowProducer: FlowProducer private flowProducer: FlowProducer
@ -187,7 +177,6 @@ class JobQueue {
for (const handlerName of Object.keys(handlers)) { for (const handlerName of Object.keys(handlers)) {
this.buildWorker(handlerName) this.buildWorker(handlerName)
this.buildQueue(handlerName) this.buildQueue(handlerName)
this.buildQueueScheduler(handlerName)
this.buildQueueEvent(handlerName) this.buildQueueEvent(handlerName)
} }
@ -205,7 +194,8 @@ class JobQueue {
autorun: false, autorun: false,
concurrency: this.getJobConcurrency(handlerName), concurrency: this.getJobConcurrency(handlerName),
prefix: this.jobRedisPrefix, prefix: this.jobRedisPrefix,
connection: Redis.getRedisClientOptions('Worker') connection: Redis.getRedisClientOptions('Worker'),
maxStalledCount: 10
} }
const handler = function (job: Job) { const handler = function (job: Job) {
@ -255,20 +245,6 @@ class JobQueue {
this.queues[handlerName] = queue this.queues[handlerName] = queue
} }
private buildQueueScheduler (handlerName: JobType) {
const queueSchedulerOptions: QueueSchedulerOptions = {
autorun: false,
connection: Redis.getRedisClientOptions('QueueScheduler'),
prefix: this.jobRedisPrefix,
maxStalledCount: 10
}
const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
this.queueSchedulers[handlerName] = queueScheduler
}
private buildQueueEvent (handlerName: JobType) { private buildQueueEvent (handlerName: JobType) {
const queueEventsOptions: QueueEventsOptions = { const queueEventsOptions: QueueEventsOptions = {
autorun: false, autorun: false,
@ -289,13 +265,11 @@ class JobQueue {
.map(handlerName => { .map(handlerName => {
const worker: Worker = this.workers[handlerName] const worker: Worker = this.workers[handlerName]
const queue: Queue = this.queues[handlerName] const queue: Queue = this.queues[handlerName]
const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([ return Promise.all([
worker.close(false), worker.close(false),
queue.close(), queue.close(),
queueScheduler.close(),
queueEvent.close() queueEvent.close()
]) ])
}) })
@ -307,12 +281,10 @@ class JobQueue {
const promises = Object.keys(this.workers) const promises = Object.keys(this.workers)
.map(handlerName => { .map(handlerName => {
const worker: Worker = this.workers[handlerName] const worker: Worker = this.workers[handlerName]
const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
const queueEvent: QueueEvents = this.queueEvents[handlerName] const queueEvent: QueueEvents = this.queueEvents[handlerName]
return Promise.all([ return Promise.all([
worker.run(), worker.run(),
queueScheduler.run(),
queueEvent.run() queueEvent.run()
]) ])
}) })

View file

@ -3205,15 +3205,14 @@ builtins@^5.0.1:
dependencies: dependencies:
semver "^7.0.0" semver "^7.0.0"
bullmq@^1.87.0: bullmq@^3.6.6:
version "1.91.1" version "3.6.6"
resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.91.1.tgz#ed17cfd4e314afa398fd099a32d365046b1ed4bc" resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.6.6.tgz#de3c407021eff2eb283fb2aca66336ebeee9d5c5"
integrity sha512-u7dat9I8ZwouZ651AMZkBSvB6NVUPpnAjd4iokd9DM41whqIBnDjuL11h7+kEjcpiDKj6E+wxZiER00FqirZQg== integrity sha512-W71jXrcTdcT3Y5tzMyTx22Cd8O3dTML7vl6KG3YdGVGrO3+UmKRLYfGLn1QwIhIoTQJVvIrSB4qfGs1hgqYRVw==
dependencies: dependencies:
cron-parser "^4.6.0" cron-parser "^4.6.0"
get-port "6.1.2"
glob "^8.0.3" glob "^8.0.3"
ioredis "^5.2.2" ioredis "^5.3.0"
lodash "^4.17.21" lodash "^4.17.21"
msgpackr "^1.6.2" msgpackr "^1.6.2"
semver "^7.3.7" semver "^7.3.7"
@ -5181,11 +5180,6 @@ get-port@5.1.1:
resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193"
integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==
get-port@6.1.2:
version "6.1.2"
resolved "https://registry.yarnpkg.com/get-port/-/get-port-6.1.2.tgz#c1228abb67ba0e17fb346da33b15187833b9c08a"
integrity sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==
get-stdin@^8.0.0: get-stdin@^8.0.0:
version "8.0.0" version "8.0.0"
resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53" resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53"
@ -5704,7 +5698,7 @@ invariant@2.2.4:
dependencies: dependencies:
loose-envify "^1.0.0" loose-envify "^1.0.0"
ioredis@^5.2.2, ioredis@^5.2.3: ioredis@^5.2.3, ioredis@^5.3.0:
version "5.3.1" version "5.3.1"
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f" resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f"
integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg== integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg==