From 1061c73fde3005100ead8764eacb444f240440d6 Mon Sep 17 00:00:00 2001
From: Chocobozzz <me@florianbigard.com>
Date: Wed, 4 Dec 2019 14:49:59 +0100
Subject: [PATCH] Add ability to filter per job type

---
 .../src/app/+admin/system/jobs/job.service.ts |  7 ++-
 .../+admin/system/jobs/jobs.component.html    | 20 ++++++--
 .../+admin/system/jobs/jobs.component.scss    | 18 ++++++-
 .../app/+admin/system/jobs/jobs.component.ts  | 39 +++++++++++----
 client/src/types/job-type-client.type.ts      |  3 ++
 server/controllers/api/jobs.ts                | 11 +++-
 server/helpers/custom-validators/jobs.ts      | 10 +++-
 server/lib/job-queue/job-queue.ts             | 26 ++++++++--
 server/middlewares/validators/jobs.ts         | 12 +++--
 server/tests/api/check-params/jobs.ts         | 12 +++++
 server/tests/api/server/handle-down.ts        |  9 +++-
 server/tests/api/server/jobs.ts               | 50 ++++++++++++++-----
 server/tests/real-world/real-world.ts         |  9 +++-
 shared/extra-utils/server/jobs.ts             | 46 ++++++++++++-----
 shared/models/server/job.model.ts             |  6 +--
 15 files changed, 220 insertions(+), 58 deletions(-)
 create mode 100644 client/src/types/job-type-client.type.ts

diff --git a/client/src/app/+admin/system/jobs/job.service.ts b/client/src/app/+admin/system/jobs/job.service.ts
index 1daae8f03..120144dff 100644
--- a/client/src/app/+admin/system/jobs/job.service.ts
+++ b/client/src/app/+admin/system/jobs/job.service.ts
@@ -3,11 +3,12 @@ import { HttpClient, HttpParams } from '@angular/common/http'
 import { Injectable } from '@angular/core'
 import { SortMeta } from 'primeng/api'
 import { Observable } from 'rxjs'
-import { ResultList } from '../../../../../../shared'
+import { JobType, ResultList } from '../../../../../../shared'
 import { JobState } from '../../../../../../shared/models'
 import { Job } from '../../../../../../shared/models/server/job.model'
 import { environment } from '../../../../environments/environment'
 import { RestExtractor, RestPagination, RestService } from '../../../shared'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
 
 @Injectable()
 export class JobService {
@@ -19,10 +20,12 @@ export class JobService {
     private restExtractor: RestExtractor
   ) {}
 
-  getJobs (state: JobState, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
+  getJobs (state: JobState, jobType: JobTypeClient, pagination: RestPagination, sort: SortMeta): Observable<ResultList<Job>> {
     let params = new HttpParams()
     params = this.restService.addRestGetParams(params, pagination, sort)
 
+    if (jobType !== 'all') params = params.append('jobType', jobType)
+
     return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
                .pipe(
                  map(res => {
diff --git a/client/src/app/+admin/system/jobs/jobs.component.html b/client/src/app/+admin/system/jobs/jobs.component.html
index 7ed1888e2..cd26257dd 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.html
+++ b/client/src/app/+admin/system/jobs/jobs.component.html
@@ -1,10 +1,22 @@
 <div class="admin-sub-header">
   <div i18n class="form-sub-title">Jobs list</div>
 
-  <div class="peertube-select-container">
-    <select [(ngModel)]="jobState" (ngModelChange)="onJobStateChanged()">
-      <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
-    </select>
+  <div class="select-filter-block">
+    <label for="jobType">Job type</label>
+    <div class="peertube-select-container">
+      <select id="jobType" name="jobType" [(ngModel)]="jobType" (ngModelChange)="onJobStateOrTypeChanged()">
+        <option *ngFor="let jobType of jobTypes" [value]="jobType">{{ jobType }}</option>
+      </select>
+    </div>
+  </div>
+
+  <div class="select-filter-block">
+    <label for="jobState">Job state</label>
+    <div class="peertube-select-container">
+      <select id="jobState" name="jobState" [(ngModel)]="jobState" (ngModelChange)="onJobStateOrTypeChanged()">
+        <option *ngFor="let state of jobStates" [value]="state">{{ state }}</option>
+      </select>
+    </div>
   </div>
 </div>
 
diff --git a/client/src/app/+admin/system/jobs/jobs.component.scss b/client/src/app/+admin/system/jobs/jobs.component.scss
index ab05f1982..ccc0b35ca 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.scss
+++ b/client/src/app/+admin/system/jobs/jobs.component.scss
@@ -1,8 +1,22 @@
 @import '_variables';
 @import '_mixins';
 
-.peertube-select-container {
-  @include peertube-select-container(auto);
+.admin-sub-header {
+  align-items: flex-end;
+
+  .select-filter-block {
+    &:not(:last-child) {
+      margin-right: 10px;
+    }
+
+    label {
+      margin-bottom: 2px;
+    }
+
+    .peertube-select-container {
+      @include peertube-select-container(auto);
+    }
+  }
 }
 
 pre {
diff --git a/client/src/app/+admin/system/jobs/jobs.component.ts b/client/src/app/+admin/system/jobs/jobs.component.ts
index b24353ca6..95ee17023 100644
--- a/client/src/app/+admin/system/jobs/jobs.component.ts
+++ b/client/src/app/+admin/system/jobs/jobs.component.ts
@@ -2,11 +2,12 @@ import { Component, OnInit } from '@angular/core'
 import { peertubeLocalStorage } from '@app/shared/misc/peertube-local-storage'
 import { Notifier } from '@app/core'
 import { SortMeta } from 'primeng/api'
-import { Job } from '../../../../../../shared/index'
+import { Job, JobType } from '../../../../../../shared/index'
 import { JobState } from '../../../../../../shared/models'
 import { RestPagination, RestTable } from '../../../shared'
 import { JobService } from './job.service'
 import { I18n } from '@ngx-translate/i18n-polyfill'
+import { JobTypeClient } from '../../../../types/job-type-client.type'
 
 @Component({
   selector: 'my-jobs',
@@ -15,9 +16,26 @@ import { I18n } from '@ngx-translate/i18n-polyfill'
 })
 export class JobsComponent extends RestTable implements OnInit {
   private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
+  private static JOB_STATE_LOCAL_STORAGE_TYPE = 'jobs-list-type'
 
   jobState: JobState = 'waiting'
   jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
+
+  jobType: JobTypeClient = 'all'
+  jobTypes: JobTypeClient[] = [
+    'all',
+    'activitypub-follow',
+    'activitypub-http-broadcast',
+    'activitypub-http-fetcher',
+    'activitypub-http-unicast',
+    'email',
+    'video-transcoding',
+    'video-file-import',
+    'video-import',
+    'videos-views',
+    'activitypub-refresher'
+  ]
+
   jobs: Job[] = []
   totalRecords: number
   rowsPerPage = 10
@@ -33,20 +51,20 @@ export class JobsComponent extends RestTable implements OnInit {
   }
 
   ngOnInit () {
-    this.loadJobState()
+    this.loadJobStateAndType()
     this.initialize()
   }
 
-  onJobStateChanged () {
+  onJobStateOrTypeChanged () {
     this.pagination.start = 0
 
     this.loadData()
-    this.saveJobState()
+    this.saveJobStateAndType()
   }
 
   protected loadData () {
     this.jobsService
-      .getJobs(this.jobState, this.pagination, this.sort)
+      .getJobs(this.jobState, this.jobType, this.pagination, this.sort)
       .subscribe(
         resultList => {
           this.jobs = resultList.data
@@ -57,13 +75,16 @@ export class JobsComponent extends RestTable implements OnInit {
       )
   }
 
-  private loadJobState () {
-    const result = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+  private loadJobStateAndType () {
+    const state = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE)
+    if (state) this.jobState = state as JobState
 
-    if (result) this.jobState = result as JobState
+    const type = peertubeLocalStorage.getItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE)
+    if (type) this.jobType = type as JobType
   }
 
-  private saveJobState () {
+  private saveJobStateAndType () {
     peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_STATE, this.jobState)
+    peertubeLocalStorage.setItem(JobsComponent.JOB_STATE_LOCAL_STORAGE_TYPE, this.jobType)
   }
 }
diff --git a/client/src/types/job-type-client.type.ts b/client/src/types/job-type-client.type.ts
new file mode 100644
index 000000000..7d51f1db2
--- /dev/null
+++ b/client/src/types/job-type-client.type.ts
@@ -0,0 +1,3 @@
+import { JobType } from '@shared/models'
+
+export type JobTypeClient = 'all' | JobType
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index 1fa662349..05320311e 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -24,7 +24,7 @@ jobsRouter.get('/:state',
   jobsSortValidator,
   setDefaultSort,
   setDefaultPagination,
-  asyncMiddleware(listJobsValidator),
+  listJobsValidator,
   asyncMiddleware(listJobs)
 )
 
@@ -39,8 +39,15 @@ export {
 async function listJobs (req: express.Request, res: express.Response) {
   const state = req.params.state as JobState
   const asc = req.query.sort === 'createdAt'
+  const jobType = req.query.jobType
 
-  const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
+  const jobs = await JobQueue.Instance.listForApi({
+    state,
+    start: req.query.start,
+    count: req.query.count,
+    asc,
+    jobType
+  })
   const total = await JobQueue.Instance.count(state)
 
   const result: ResultList<any> = {
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts
index 1cc6e6912..dd33e85a3 100644
--- a/server/helpers/custom-validators/jobs.ts
+++ b/server/helpers/custom-validators/jobs.ts
@@ -1,14 +1,20 @@
 import { JobState } from '../../../shared/models'
 import { exists } from './misc'
+import { jobTypes } from '@server/lib/job-queue/job-queue'
 
 const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
 
 function isValidJobState (value: JobState) {
-  return exists(value) && jobStates.indexOf(value) !== -1
+  return exists(value) && jobStates.includes(value)
+}
+
+function isValidJobType (value: any) {
+  return exists(value) && jobTypes.includes(value)
 }
 
 // ---------------------------------------------------------------------------
 
 export {
-  isValidJobState
+  isValidJobState,
+  isValidJobType
 }
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 3c810da98..ec601e9ea 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -121,11 +121,20 @@ class JobQueue {
     return queue.add(obj.payload, jobArgs)
   }
 
-  async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
+  async listForApi (options: {
+    state: JobState,
+    start: number,
+    count: number,
+    asc?: boolean,
+    jobType: JobType
+  }): Promise<Bull.Job[]> {
+    const { state, start, count, asc, jobType } = options
     let results: Bull.Job[] = []
 
+    const filteredJobTypes = this.filterJobTypes(jobType)
+
     // TODO: optimize
-    for (const jobType of jobTypes) {
+    for (const jobType of filteredJobTypes) {
       const queue = this.queues[ jobType ]
       if (queue === undefined) {
         logger.error('Unknown queue %s to list jobs.', jobType)
@@ -149,10 +158,12 @@ class JobQueue {
     return results.slice(start, start + count)
   }
 
-  async count (state: JobState): Promise<number> {
+  async count (state: JobState, jobType?: JobType): Promise<number> {
     let total = 0
 
-    for (const type of jobTypes) {
+    const filteredJobTypes = this.filterJobTypes(jobType)
+
+    for (const type of filteredJobTypes) {
       const queue = this.queues[ type ]
       if (queue === undefined) {
         logger.error('Unknown queue %s to count jobs.', type)
@@ -180,6 +191,12 @@ class JobQueue {
     })
   }
 
+  private filterJobTypes (jobType?: JobType) {
+    if (!jobType) return jobTypes
+
+    return jobTypes.filter(t => t === jobType)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }
@@ -188,5 +205,6 @@ class JobQueue {
 // ---------------------------------------------------------------------------
 
 export {
+  jobTypes,
   JobQueue
 }
diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts
index 41a8d6899..b57615dbc 100644
--- a/server/middlewares/validators/jobs.ts
+++ b/server/middlewares/validators/jobs.ts
@@ -1,13 +1,17 @@
 import * as express from 'express'
-import { param } from 'express-validator'
-import { isValidJobState } from '../../helpers/custom-validators/jobs'
+import { param, query } from 'express-validator'
+import { isValidJobState, isValidJobType } from '../../helpers/custom-validators/jobs'
 import { logger } from '../../helpers/logger'
 import { areValidationErrors } from './utils'
 
 const listJobsValidator = [
-  param('state').custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+  param('state')
+    .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
+  query('jobType')
+    .optional()
+    .custom(isValidJobType).withMessage('Should have a valid job state'),
 
-  async (req: express.Request, res: express.Response, next: express.NextFunction) => {
+  (req: express.Request, res: express.Response, next: express.NextFunction) => {
     logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
 
     if (areValidationErrors(req, res)) return
diff --git a/server/tests/api/check-params/jobs.ts b/server/tests/api/check-params/jobs.ts
index c70139514..22e237964 100644
--- a/server/tests/api/check-params/jobs.ts
+++ b/server/tests/api/check-params/jobs.ts
@@ -51,6 +51,17 @@ describe('Test jobs API validators', function () {
       })
     })
 
+    it('Should fail with an incorrect job type', async function () {
+      await makeGetRequest({
+        url: server.url,
+        token: server.accessToken,
+        path,
+        query: {
+          jobType: 'toto'
+        }
+      })
+    })
+
     it('Should fail with a bad start pagination', async function () {
       await checkBadStartPagination(server.url, path, server.accessToken)
     })
@@ -79,6 +90,7 @@ describe('Test jobs API validators', function () {
         statusCodeExpected: 403
       })
     })
+
   })
 
   after(async function () {
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts
index a0f505474..7e36067f1 100644
--- a/server/tests/api/server/handle-down.ts
+++ b/server/tests/api/server/handle-down.ts
@@ -184,7 +184,14 @@ describe('Test handle downs', function () {
     const states: JobState[] = [ 'waiting', 'active' ]
 
     for (const state of states) {
-      const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 0 ].url,
+        accessToken: servers[ 0 ].accessToken,
+        state: state,
+        start: 0,
+        count: 50,
+        sort: '-createdAt'
+      })
       expect(res.body.data).to.have.length(0)
     }
   })
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index ceea47a85..58d8c8c10 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -41,20 +41,46 @@ describe('Test jobs', function () {
     expect(res.body.data).to.have.length.above(2)
   })
 
-  it('Should list jobs with sort and pagination', async function () {
-    const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 2, 'createdAt')
-    expect(res.body.total).to.be.above(2)
-    expect(res.body.data).to.have.lengthOf(2)
+  it('Should list jobs with sort, pagination and job type', async function () {
+    {
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 1 ].url,
+        accessToken: servers[ 1 ].accessToken,
+        state: 'completed',
+        start: 1,
+        count: 2,
+        sort: 'createdAt'
+      })
+      expect(res.body.total).to.be.above(2)
+      expect(res.body.data).to.have.lengthOf(2)
 
-    let job = res.body.data[0]
-    // Skip repeat jobs
-    if (job.type === 'videos-views') job = res.body.data[1]
+      let job: Job = res.body.data[ 0 ]
+      // Skip repeat jobs
+      if (job.type === 'videos-views') job = res.body.data[ 1 ]
 
-    expect(job.state).to.equal('completed')
-    expect(job.type.startsWith('activitypub-')).to.be.true
-    expect(dateIsValid(job.createdAt)).to.be.true
-    expect(dateIsValid(job.processedOn)).to.be.true
-    expect(dateIsValid(job.finishedOn)).to.be.true
+      expect(job.state).to.equal('completed')
+      expect(job.type.startsWith('activitypub-')).to.be.true
+      expect(dateIsValid(job.createdAt as string)).to.be.true
+      expect(dateIsValid(job.processedOn as string)).to.be.true
+      expect(dateIsValid(job.finishedOn as string)).to.be.true
+    }
+
+    {
+      const res = await getJobsListPaginationAndSort({
+        url: servers[ 1 ].url,
+        accessToken: servers[ 1 ].accessToken,
+        state: 'completed',
+        start: 0,
+        count: 100,
+        sort: 'createdAt',
+        jobType: 'activitypub-http-broadcast'
+      })
+      expect(res.body.total).to.be.above(2)
+
+      for (const j of res.body.data as Job[]) {
+        expect(j.type).to.equal('activitypub-http-broadcast')
+      }
+    }
   })
 
   after(async function () {
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts
index 8b070004d..cba5ac311 100644
--- a/server/tests/real-world/real-world.ts
+++ b/server/tests/real-world/real-world.ts
@@ -354,7 +354,14 @@ async function isTherePendingRequests (servers: ServerInfo[]) {
   // Check if each server has pending request
   for (const server of servers) {
     for (const state of states) {
-      const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+      const p = getJobsListPaginationAndSort({
+        url: server.url,
+        accessToken: server.accessToken,
+        state: state,
+        start: 0,
+        count: 10,
+        sort: '-createdAt'
+      })
         .then(res => {
           if (res.body.total > 0) pendingRequests = true
         })
diff --git a/shared/extra-utils/server/jobs.ts b/shared/extra-utils/server/jobs.ts
index b3db885e8..cc1352e14 100644
--- a/shared/extra-utils/server/jobs.ts
+++ b/shared/extra-utils/server/jobs.ts
@@ -1,7 +1,8 @@
 import * as request from 'supertest'
-import { Job, JobState } from '../../models'
+import { Job, JobState, JobType } from '../../models'
 import { wait } from '../miscs/miscs'
 import { ServerInfo } from './servers'
+import { makeGetRequest } from '@shared/extra-utils'
 
 function getJobsList (url: string, accessToken: string, state: JobState) {
   const path = '/api/v1/jobs/' + state
@@ -14,18 +15,32 @@ function getJobsList (url: string, accessToken: string, state: JobState) {
           .expect('Content-Type', /json/)
 }
 
-function getJobsListPaginationAndSort (url: string, accessToken: string, state: JobState, start: number, count: number, sort: string) {
+function getJobsListPaginationAndSort (options: {
+  url: string,
+  accessToken: string,
+  state: JobState,
+  start: number,
+  count: number,
+  sort: string,
+  jobType?: JobType
+}) {
+  const { url, accessToken, state, start, count, sort, jobType } = options
   const path = '/api/v1/jobs/' + state
 
-  return request(url)
-          .get(path)
-          .query({ start })
-          .query({ count })
-          .query({ sort })
-          .set('Accept', 'application/json')
-          .set('Authorization', 'Bearer ' + accessToken)
-          .expect(200)
-          .expect('Content-Type', /json/)
+  const query = {
+    start,
+    count,
+    sort,
+    jobType
+  }
+
+  return makeGetRequest({
+    url,
+    path,
+    token: accessToken,
+    statusCodeExpected: 200,
+    query
+  })
 }
 
 async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
@@ -44,7 +59,14 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
     // Check if each server has pending request
     for (const server of servers) {
       for (const state of states) {
-        const p = getJobsListPaginationAndSort(server.url, server.accessToken, state, 0, 10, '-createdAt')
+        const p = getJobsListPaginationAndSort({
+          url: server.url,
+          accessToken: server.accessToken,
+          state: state,
+          start: 0,
+          count: 10,
+          sort: '-createdAt'
+        })
           .then(res => res.body.data)
           .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
           .then(jobs => {
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index 1b9aa8a07..b82a633b2 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -17,7 +17,7 @@ export interface Job {
   type: JobType
   data: any,
   error: any,
-  createdAt: Date
-  finishedOn: Date
-  processedOn: Date
+  createdAt: Date | string
+  finishedOn: Date | string
+  processedOn: Date | string
 }