1
0
Fork 0
peertube/server/lib/job-queue/job-queue.ts
Chocobozzz 51353d9a03 Refactor video views
Introduce viewers attribute for live videos
Count views for live videos
Reduce delay to see the viewer update for lives
Add ability to configure video views buffer interval and view ip
expiration
2021-11-09 15:00:31 +01:00

284 lines
9.1 KiB
TypeScript

import Bull, { Job, JobOptions, Queue } from 'bull'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
import {
ActivitypubFollowPayload,
ActivitypubHttpBroadcastPayload,
ActivitypubHttpFetcherPayload,
ActivitypubHttpUnicastPayload,
ActorKeysPayload,
DeleteResumableUploadMetaFilePayload,
EmailPayload,
JobState,
JobType,
MoveObjectStoragePayload,
RefreshPayload,
VideoFileImportPayload,
VideoImportPayload,
VideoLiveEndingPayload,
VideoRedundancyPayload,
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Redis } from '../redis'
import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
{ type: 'activitypub-http-cleaner', payload: {} } |
{ type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
{ type: 'video-file-import', payload: VideoFileImportPayload } |
{ type: 'video-transcoding', payload: VideoTranscodingPayload } |
{ type: 'email', payload: EmailPayload } |
{ type: 'video-import', payload: VideoImportPayload } |
{ type: 'activitypub-refresher', payload: RefreshPayload } |
{ type: 'videos-views-stats', payload: {} } |
{ type: 'video-live-ending', payload: VideoLiveEndingPayload } |
{ type: 'actor-keys', payload: ActorKeysPayload } |
{ type: 'video-redundancy', payload: VideoRedundancyPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
export type CreateJobOptions = {
delay?: number
priority?: number
}
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
'activitypub-http-unicast': processActivityPubHttpUnicast,
'activitypub-http-fetcher': processActivityPubHttpFetcher,
'activitypub-cleaner': processActivityPubCleaner,
'activitypub-follow': processActivityPubFollow,
'video-file-import': processVideoFileImport,
'video-transcoding': processVideoTranscoding,
'email': processEmail,
'video-import': processVideoImport,
'videos-views-stats': processVideosViewsStats,
'activitypub-refresher': refreshAPObject,
'video-live-ending': processVideoLiveEnding,
'actor-keys': processActorKeys,
'video-redundancy': processVideoRedundancy,
'move-to-object-storage': processMoveToObjectStorage
}
const jobTypes: JobType[] = [
'activitypub-follow',
'activitypub-http-broadcast',
'activitypub-http-fetcher',
'activitypub-http-unicast',
'activitypub-cleaner',
'email',
'video-transcoding',
'video-file-import',
'video-import',
'videos-views-stats',
'activitypub-refresher',
'video-redundancy',
'actor-keys',
'video-live-ending',
'move-to-object-storage'
]
class JobQueue {
private static instance: JobQueue
private queues: { [id in JobType]?: Queue } = {}
private initialized = false
private jobRedisPrefix: string
private constructor () {
}
init (produceOnly = false) {
// Already initialized
if (this.initialized === true) return
this.initialized = true
this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
const queueOptions = {
prefix: this.jobRedisPrefix,
redis: Redis.getRedisClientOptions(),
settings: {
maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
}
}
for (const handlerName of (Object.keys(handlers) as JobType[])) {
const queue = new Bull(handlerName, queueOptions)
if (produceOnly) {
queue.pause(true)
.catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
}
const handler = handlers[handlerName]
queue.process(this.getJobConcurrency(handlerName), handler)
.catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
queue.on('failed', (job, err) => {
logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
})
queue.on('error', err => {
logger.error('Error in job queue %s.', handlerName, { err })
})
this.queues[handlerName] = queue
}
this.addRepeatableJobs()
}
terminate () {
for (const queueName of Object.keys(this.queues)) {
const queue = this.queues[queueName]
queue.close()
}
}
createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
this.createJobWithPromise(obj, options)
.catch(err => logger.error('Cannot create job.', { err, obj }))
}
createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
const queue = this.queues[obj.type]
if (queue === undefined) {
logger.error('Unknown queue %s: cannot create job.', obj.type)
return
}
const jobArgs: JobOptions = {
backoff: { delay: 60 * 1000, type: 'exponential' },
attempts: JOB_ATTEMPTS[obj.type],
timeout: JOB_TTL[obj.type],
priority: options.priority,
delay: options.delay
}
return queue.add(obj.payload, jobArgs)
}
async listForApi (options: {
state?: JobState
start: number
count: number
asc?: boolean
jobType: JobType
}): Promise<Job[]> {
const { state, start, count, asc, jobType } = options
const states = state ? [ state ] : jobStates
let results: Job[] = []
const filteredJobTypes = this.filterJobTypes(jobType)
for (const jobType of filteredJobTypes) {
const queue = this.queues[jobType]
if (queue === undefined) {
logger.error('Unknown queue %s to list jobs.', jobType)
continue
}
const jobs = await queue.getJobs(states, 0, start + count, asc)
results = results.concat(jobs)
}
results.sort((j1: any, j2: any) => {
if (j1.timestamp < j2.timestamp) return -1
else if (j1.timestamp === j2.timestamp) return 0
return 1
})
if (asc === false) results.reverse()
return results.slice(start, start + count)
}
async count (state: JobState, jobType?: JobType): Promise<number> {
const states = state ? [ state ] : jobStates
let total = 0
const filteredJobTypes = this.filterJobTypes(jobType)
for (const type of filteredJobTypes) {
const queue = this.queues[type]
if (queue === undefined) {
logger.error('Unknown queue %s to count jobs.', type)
continue
}
const counts = await queue.getJobCounts()
for (const s of states) {
total += counts[s]
}
}
return total
}
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue = this.queues[key]
await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
}
}
private addRepeatableJobs () {
this.queues['videos-views-stats'].add({}, {
repeat: REPEAT_JOBS['videos-views-stats']
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
this.queues['activitypub-cleaner'].add({}, {
repeat: REPEAT_JOBS['activitypub-cleaner']
}).catch(err => logger.error('Cannot add repeatable job.', { err }))
}
}
private filterJobTypes (jobType?: JobType) {
if (!jobType) return jobTypes
return jobTypes.filter(t => t === jobType)
}
private getJobConcurrency (jobType: JobType) {
if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
return JOB_CONCURRENCY[jobType]
}
static get Instance () {
return this.instance || (this.instance = new this())
}
}
// ---------------------------------------------------------------------------
export {
jobTypes,
JobQueue
}