1
0
Fork 0

Don't crash on redis connection error

This commit is contained in:
Chocobozzz 2022-08-11 11:30:06 +02:00
parent aeb112edbe
commit ab08ab4e28
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
2 changed files with 18 additions and 7 deletions

View File

@ -50,6 +50,7 @@ import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetch
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
import { processEmail } from './handlers/email'
import { processFederateVideo } from './handlers/federate-video'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
@ -62,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoStudioEdition } from './handlers/video-studio-edition'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@ -186,6 +186,7 @@ class JobQueue {
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix
})
this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
this.addRepeatableJobs()
}
@ -228,9 +229,7 @@ class JobQueue {
}
})
worker.on('error', err => {
logger.error('Error in job queue %s.', handlerName, { err })
})
worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
this.workers[handlerName] = worker
}
@ -241,7 +240,10 @@ class JobQueue {
prefix: this.jobRedisPrefix
}
this.queues[handlerName] = new Queue(handlerName, queueOptions)
const queue = new Queue(handlerName, queueOptions)
queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
this.queues[handlerName] = queue
}
private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
@ -251,7 +253,11 @@ class JobQueue {
prefix: this.jobRedisPrefix,
maxStalledCount: 10
}
this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
this.queueSchedulers[handlerName] = queueScheduler
}
private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
@ -260,7 +266,11 @@ class JobQueue {
connection: this.getRedisConnection(),
prefix: this.jobRedisPrefix
}
this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
this.queueEvents[handlerName] = queueEvents
}
private getRedisConnection () {

View File

@ -33,6 +33,7 @@ class Redis {
this.initialized = true
this.client = createClient(Redis.getRedisClientOptions())
this.client.on('error', err => logger.error('Redis Client Error', { err }))
logger.info('Connecting to redis...')