1
0
Fork 0

Refactor jobs state

This commit is contained in:
Chocobozzz 2020-12-14 12:00:35 +01:00
parent c04816108e
commit 402145b863
No known key found for this signature in database
GPG key ID: 583A612D890159BE
7 changed files with 65 additions and 59 deletions

View file

@ -1,7 +1,8 @@
import * as express from 'express'
import { ResultList } from '../../../shared'
import { Job, JobType, JobState } from '../../../shared/models'
import { Job, JobState, JobType } from '../../../shared/models'
import { UserRight } from '../../../shared/models/users'
import { isArray } from '../../helpers/custom-validators/misc'
import { JobQueue } from '../../lib/job-queue'
import {
asyncMiddleware,
@ -12,13 +13,11 @@ import {
setDefaultSort
} from '../../middlewares'
import { paginationValidator } from '../../middlewares/validators'
import { listJobsStateValidator, listJobsValidator } from '../../middlewares/validators/jobs'
import { isArray } from '../../helpers/custom-validators/misc'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { listJobsValidator } from '../../middlewares/validators/jobs'
const jobsRouter = express.Router()
jobsRouter.get('/',
jobsRouter.get('/:state?',
authenticate,
ensureUserHasRight(UserRight.MANAGE_JOBS),
paginationValidator,
@ -29,18 +28,6 @@ jobsRouter.get('/',
asyncMiddleware(listJobs)
)
jobsRouter.get('/:state',
authenticate,
ensureUserHasRight(UserRight.MANAGE_JOBS),
paginationValidator,
jobsSortValidator,
setDefaultSort,
setDefaultPagination,
listJobsValidator,
listJobsStateValidator,
asyncMiddleware(listJobs)
)
// ---------------------------------------------------------------------------
export {
@ -50,7 +37,7 @@ export {
// ---------------------------------------------------------------------------
async function listJobs (req: express.Request, res: express.Response) {
const state = req.params.state as JobState || jobStates
const state = req.params.state as JobState
const asc = req.query.sort === 'createdAt'
const jobType = req.query.jobType
@ -65,17 +52,22 @@ async function listJobs (req: express.Request, res: express.Response) {
const result: ResultList<Job> = {
total,
data: Array.isArray(state)
? await Promise.all(
jobs.map(async j => formatJob(j, await j.getState() as JobState))
)
: jobs.map(j => formatJob(j, state))
data: state
? jobs.map(j => formatJob(j, state))
: await Promise.all(jobs.map(j => formatJobWithUnknownState(j)))
}
return res.json(result)
}
async function formatJobWithUnknownState (job: any) {
return formatJob(job, await job.getState())
}
function formatJob (job: any, state: JobState): Job {
const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null
const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
? job.stacktrace[0]
: null
return {
id: job.id,

View file

@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models'
import { exists } from './misc'
import { jobTypes } from '@server/lib/job-queue/job-queue'
const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ]
function isValidJobState (value: JobState) {
return exists(value) && jobStates.includes(value)

View file

@ -1,4 +1,6 @@
import * as Bull from 'bull'
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
import {
ActivitypubFollowPayload,
ActivitypubHttpBroadcastPayload,
@ -15,20 +17,19 @@ import {
VideoTranscodingPayload
} from '../../../shared/models'
import { logger } from '../../helpers/logger'
import { Redis } from '../redis'
import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
import { Redis } from '../redis'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
import { processEmail } from './handlers/email'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processActivityPubFollow } from './handlers/activitypub-follow'
import { processVideoImport } from './handlers/video-import'
import { processVideosViews } from './handlers/video-views'
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processEmail } from './handlers/email'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViews } from './handlers/video-views'
type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@ -154,13 +155,15 @@ class JobQueue {
}
async listForApi (options: {
state: JobState | JobState[]
state?: JobState
start: number
count: number
asc?: boolean
jobType: JobType
}): Promise<Bull.Job[]> {
const { state = Array.isArray(options.state) ? options.state : [ options.state ], start, count, asc, jobType } = options
const { state, start, count, asc, jobType } = options
const states = state ? [ state ] : jobStates
let results: Bull.Job[] = []
const filteredJobTypes = this.filterJobTypes(jobType)
@ -172,7 +175,7 @@ class JobQueue {
continue
}
const jobs = await queue.getJobs(state as Bull.JobStatus[], 0, start + count, asc)
const jobs = await queue.getJobs(states, 0, start + count, asc)
results = results.concat(jobs)
}
@ -188,8 +191,8 @@ class JobQueue {
return results.slice(start, start + count)
}
async count (state: JobState | JobState[], jobType?: JobType): Promise<number> {
const states = Array.isArray(state) ? state : [ state ]
async count (state: JobState, jobType?: JobType): Promise<number> {
const states = state ? [ state ] : jobStates
let total = 0
const filteredJobTypes = this.filterJobTypes(jobType)

View file

@ -5,6 +5,10 @@ import { logger } from '../../helpers/logger'
import { areValidationErrors } from './utils'
const listJobsValidator = [
param('state')
.optional()
.custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
query('jobType')
.optional()
.custom(isValidJobType).withMessage('Should have a valid job state'),
@ -18,22 +22,8 @@ const listJobsValidator = [
}
]
const listJobsStateValidator = [
param('state')
.custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
(req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
if (areValidationErrors(req, res)) return
return next()
}
]
// ---------------------------------------------------------------------------
export {
listJobsValidator,
listJobsStateValidator
listJobsValidator
}

View file

@ -83,6 +83,19 @@ describe('Test jobs', function () {
}
})
it('Should list all jobs', async function () {
const res = await getJobsList(servers[1].url, servers[1].accessToken)
const jobs = res.body.data as Job[]
expect(res.body.total).to.be.above(2)
expect(jobs).to.have.length.above(2)
// We know there are a least 1 delayed job (video views) and 1 completed job (broadcast)
expect(jobs.find(j => j.state === 'delayed')).to.not.be.undefined
expect(jobs.find(j => j.state === 'completed')).to.not.be.undefined
})
after(async function () {
await cleanupTests(servers)
})

View file

@ -1,12 +1,20 @@
import * as request from 'supertest'
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
import { makeGetRequest } from '../../../shared/extra-utils'
import { Job, JobState, JobType } from '../../models'
import { wait } from '../miscs/miscs'
import { ServerInfo } from './servers'
import { makeGetRequest } from '../../../shared/extra-utils'
import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
function getJobsList (url: string, accessToken: string, state: JobState) {
const path = '/api/v1/jobs/' + state
function buildJobsUrl (state?: JobState) {
let path = '/api/v1/jobs'
if (state) path += '/' + state
return path
}
function getJobsList (url: string, accessToken: string, state?: JobState) {
const path = buildJobsUrl(state)
return request(url)
.get(path)
@ -19,14 +27,14 @@ function getJobsList (url: string, accessToken: string, state: JobState) {
function getJobsListPaginationAndSort (options: {
url: string
accessToken: string
state: JobState
start: number
count: number
sort: string
state?: JobState
jobType?: JobType
}) {
const { url, accessToken, state, start, count, sort, jobType } = options
const path = '/api/v1/jobs/' + state
const path = buildJobsUrl(state)
const query = {
start,

View file

@ -2,7 +2,7 @@ import { ContextType } from '../activitypub/context'
import { VideoResolution } from '../videos/video-resolution.enum'
import { SendEmailOptions } from './emailer.model'
export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused'
export type JobType =
| 'activitypub-http-unicast'