Fix job queue tests
This commit is contained in:
parent
cfb5edbd9e
commit
e2b2c726b1
4 changed files with 45 additions and 23 deletions
|
@ -26,7 +26,7 @@ jobsRouter.post('/pause',
|
|||
jobsRouter.post('/resume',
|
||||
authenticate,
|
||||
ensureUserHasRight(UserRight.MANAGE_JOBS),
|
||||
resumeJobQueue
|
||||
asyncMiddleware(resumeJobQueue)
|
||||
)
|
||||
|
||||
jobsRouter.get('/:state?',
|
||||
|
@ -55,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) {
|
|||
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
|
||||
}
|
||||
|
||||
function resumeJobQueue (req: express.Request, res: express.Response) {
|
||||
JobQueue.Instance.resume()
|
||||
async function resumeJobQueue (req: express.Request, res: express.Response) {
|
||||
await JobQueue.Instance.resume()
|
||||
|
||||
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
|
||||
}
|
||||
|
|
|
@ -284,18 +284,22 @@ class JobQueue {
|
|||
}
|
||||
|
||||
async pause () {
|
||||
for (const handler of Object.keys(this.workers)) {
|
||||
const worker: Worker = this.workers[handler]
|
||||
for (const handlerName of Object.keys(this.workers)) {
|
||||
const worker: Worker = this.workers[handlerName]
|
||||
const queue: Queue = this.queues[handlerName]
|
||||
|
||||
await worker.pause()
|
||||
await queue.pause()
|
||||
}
|
||||
}
|
||||
|
||||
resume () {
|
||||
for (const handler of Object.keys(this.workers)) {
|
||||
const worker: Worker = this.workers[handler]
|
||||
async resume () {
|
||||
for (const handlerName of Object.keys(this.workers)) {
|
||||
const worker: Worker = this.workers[handlerName]
|
||||
const queue: Queue = this.queues[handlerName]
|
||||
|
||||
worker.resume()
|
||||
await queue.resume()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -373,10 +377,10 @@ class JobQueue {
|
|||
}): Promise<Job[]> {
|
||||
const { state, start, count, asc, jobType } = options
|
||||
|
||||
const states = state ? [ state ] : jobStates
|
||||
let results: Job[] = []
|
||||
const states = this.buildStateFilter(state)
|
||||
const filteredJobTypes = this.buildTypeFilter(jobType)
|
||||
|
||||
const filteredJobTypes = this.filterJobTypes(jobType)
|
||||
let results: Job[] = []
|
||||
|
||||
for (const jobType of filteredJobTypes) {
|
||||
const queue: Queue = this.queues[jobType]
|
||||
|
@ -404,9 +408,9 @@ class JobQueue {
|
|||
|
||||
async count (state: JobState, jobType?: JobType): Promise<number> {
|
||||
const states = state ? [ state ] : jobStates
|
||||
let total = 0
|
||||
const filteredJobTypes = this.buildTypeFilter(jobType)
|
||||
|
||||
const filteredJobTypes = this.filterJobTypes(jobType)
|
||||
let total = 0
|
||||
|
||||
for (const type of filteredJobTypes) {
|
||||
const queue = this.queues[type]
|
||||
|
@ -425,6 +429,23 @@ class JobQueue {
|
|||
return total
|
||||
}
|
||||
|
||||
private buildStateFilter (state?: JobState) {
|
||||
if (!state) return jobStates
|
||||
|
||||
const states = [ state ]
|
||||
|
||||
// Include parent if filtering on waiting
|
||||
if (state === 'waiting') states.push('waiting-children')
|
||||
|
||||
return states
|
||||
}
|
||||
|
||||
private buildTypeFilter (jobType?: JobType) {
|
||||
if (!jobType) return jobTypes
|
||||
|
||||
return jobTypes.filter(t => t === jobType)
|
||||
}
|
||||
|
||||
async getStats () {
|
||||
const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
|
||||
|
||||
|
@ -452,12 +473,6 @@ class JobQueue {
|
|||
}
|
||||
}
|
||||
|
||||
private filterJobTypes (jobType?: JobType) {
|
||||
if (!jobType) return jobTypes
|
||||
|
||||
return jobTypes.filter(t => t === jobType)
|
||||
}
|
||||
|
||||
private getJobConcurrency (jobType: JobType) {
|
||||
if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
|
||||
if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
|
||||
|
|
|
@ -122,7 +122,7 @@ describe('Test logs API validators', function () {
|
|||
})
|
||||
|
||||
it('Should fail with an invalid stackTrace', async function () {
|
||||
await server.logs.createLogClient({ payload: { ...base, stackTrace: 's'.repeat(10000) }, expectedStatus })
|
||||
await server.logs.createLogClient({ payload: { ...base, stackTrace: 's'.repeat(20000) }, expectedStatus })
|
||||
})
|
||||
|
||||
it('Should fail with an invalid userAgent', async function () {
|
||||
|
|
|
@ -60,7 +60,6 @@ describe('Test jobs', function () {
|
|||
if (job.type === 'videos-views-stats') job = body.data[1]
|
||||
|
||||
expect(job.state).to.equal('completed')
|
||||
expect(job.type.startsWith('activitypub-')).to.be.true
|
||||
expect(dateIsValid(job.createdAt as string)).to.be.true
|
||||
expect(dateIsValid(job.processedOn as string)).to.be.true
|
||||
expect(dateIsValid(job.finishedOn as string)).to.be.true
|
||||
|
@ -103,8 +102,16 @@ describe('Test jobs', function () {
|
|||
|
||||
await wait(5000)
|
||||
|
||||
const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' })
|
||||
expect(body.data).to.have.lengthOf(4)
|
||||
{
|
||||
const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' })
|
||||
// waiting includes waiting-children
|
||||
expect(body.data).to.have.lengthOf(4)
|
||||
}
|
||||
|
||||
{
|
||||
const body = await servers[1].jobs.list({ state: 'waiting-children', jobType: 'video-transcoding' })
|
||||
expect(body.data).to.have.lengthOf(1)
|
||||
}
|
||||
})
|
||||
|
||||
it('Should resume the job queue', async function () {
|
||||
|
|
Loading…
Reference in a new issue