1
0
Fork 0

Add list-jobs runner command

This commit is contained in:
Chocobozzz 2025-02-20 10:31:04 +01:00
parent 1fc7e003e5
commit b9a53addc9
No known key found for this signature in database
GPG key ID: 583A612D890159BE
12 changed files with 144 additions and 16 deletions

View file

@ -0,0 +1,8 @@
# Changelog
## v0.1.0
* Requires Node 20
* Introduce `list-jobs` command to list processing jobs
* Update dependencies
* Send last chunks/playlist content to correctly end the live

View file

@ -2,7 +2,7 @@
import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
import { RunnerJobType } from '@peertube/peertube-models'
import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { listJobs, listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { gracefulShutdown } from './register/shutdown.js'
import { RunnerServer } from './server/index.js'
import { getSupportedJobsList } from './server/shared/supported-job.js'
@ -99,6 +99,19 @@ program.command('list-registered')
}
})
program.command('list-jobs')
.description('List processing jobs')
.option('--include-payload', 'Include job payload in the output')
.action(async options => {
try {
await listJobs({ includePayload: options.includePayload })
} catch (err) {
console.error('Cannot list processing jobs.')
console.error(err)
process.exit(-1)
}
})
program.command('graceful-shutdown')
.description('Exit runner when all processing tasks are finished')
.action(async () => {

View file

@ -34,3 +34,14 @@ export async function listRegistered () {
client.stop()
}
export async function listJobs (options: {
includePayload: boolean
}) {
const client = new IPCClient()
await client.run()
await client.askListJobs(options)
client.stop()
}

View file

@ -61,11 +61,13 @@ export function scheduleTranscodingProgress (options: {
: 60000
const update = () => {
job.progress = progressGetter() || 0
server.runnerJobs.update({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
progress: progressGetter()
progress: job.progress
}).catch(err => logger.error({ err }, 'Cannot send job progress'))
}

View file

@ -183,6 +183,19 @@ export class RunnerServer {
// ---------------------------------------------------------------------------
listJobs () {
return {
concurrency: ConfigManager.Instance.getConfig().jobs.concurrency,
processingJobs: this.processingJobs.map(j => ({
serverUrl: j.server.url,
job: pick(j.job, [ 'type', 'startedAt', 'progress', 'payload' ])
}))
}
}
// ---------------------------------------------------------------------------
requestGracefulShutdown () {
logger.info('Received graceful shutdown request')

View file

@ -1,8 +1,8 @@
import { Client as NetIPC } from '@peertube/net-ipc'
import CliTable3 from 'cli-table3'
import { ensureDir } from 'fs-extra/esm'
import { Client as NetIPC } from '@peertube/net-ipc'
import { ConfigManager } from '../config-manager.js'
import { IPCResponse, IPCResponseData, IPCRequest } from './shared/index.js'
import { IPCRequest, IPCResponse, IPCResponseListJobs, IPCResponseListRegistered } from './shared/index.js'
export class IPCClient {
private netIPC: NetIPC
@ -65,7 +65,7 @@ export class IPCClient {
type: 'list-registered'
}
const { success, error, data } = await this.netIPC.request(req) as IPCResponse<IPCResponseData>
const { success, error, data } = await this.netIPC.request(req) as IPCResponse<IPCResponseListRegistered>
if (!success) {
console.error('Could not list registered PeerTube instances', error)
return
@ -82,6 +82,39 @@ export class IPCClient {
console.log(table.toString())
}
async askListJobs (options: {
includePayload: boolean
}) {
const req: IPCRequest = {
type: 'list-jobs'
}
const { success, error, data } = await this.netIPC.request(req) as IPCResponse<IPCResponseListJobs>
if (!success) {
console.error('Could not list jobs', error)
return
}
const head = [ 'instance', 'type', 'started', 'progress' ]
if (options.includePayload) head.push('payload')
const table = new CliTable3({
head,
wordWrap: true,
wrapOnWordBoundary: false
})
for (const { serverUrl, job } of data.processingJobs) {
const row = [ serverUrl, job.type, job.startedAt.toLocaleString(), `${job.progress}%` ]
if (options.includePayload) row.push(JSON.stringify(job.payload, undefined, 2))
table.push(row)
}
console.log(`Processing ${data.processingJobs.length}/${data.concurrency} jobs`)
console.log(table.toString())
}
// ---------------------------------------------------------------------------
async askGracefulShutdown () {

View file

@ -46,6 +46,9 @@ export class IPCServer {
case 'list-registered':
return Promise.resolve(this.runnerServer.listRegistered())
case 'list-jobs':
return Promise.resolve(this.runnerServer.listJobs())
case 'graceful-shutdown':
this.runnerServer.requestGracefulShutdown()
return undefined

View file

@ -2,7 +2,8 @@ export type IPCRequest =
IPCRequestRegister |
IPCRequestUnregister |
IPCRequestListRegistered |
IPCRequestGracefulShutdown
IPCRequestGracefulShutdown |
IPCRequestListJobs
export type IPCRequestRegister = {
type: 'register'
@ -14,5 +15,6 @@ export type IPCRequestRegister = {
export type IPCRequestUnregister = { type: 'unregister', url: string, runnerName: string }
export type IPCRequestListRegistered = { type: 'list-registered' }
export type IPCRequestListJobs = { type: 'list-jobs' }
export type IPCRequestGracefulShutdown = { type: 'graceful-shutdown' }

View file

@ -1,15 +1,24 @@
import { RunnerJob } from '@peertube/peertube-models'
export type IPCResponse <T extends IPCResponseData = undefined> = {
success: boolean
error?: string
data?: T
}
export type IPCResponseData =
// list registered
{
servers: {
runnerName: string
runnerDescription: string
url: string
}[]
}
export type IPCResponseData = IPCResponseListRegistered | IPCResponseListJobs
export type IPCResponseListRegistered = {
servers: {
runnerName: string
runnerDescription: string
url: string
}[]
}
export type IPCResponseListJobs = {
concurrency: number
processingJobs: {
serverUrl: string
job: Pick<RunnerJob, 'type' | 'startedAt' | 'progress' | 'payload'>
}[]
}

View file

@ -53,6 +53,13 @@ describe('Test Live transcoding in peertube-runner program', function () {
transcoded: true
})
// Check jobs output
{
const jobsList = await peertubeRunner.listJobs()
expect(jobsList).to.contain(servers[0].url)
expect(jobsList).to.contain('live-rtmp-hls-transcoding')
}
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveWaitingOnAllServers(servers, video.uuid)

View file

@ -89,6 +89,13 @@ export class PeerTubeRunnerProcess {
return stdout
}
async listJobs () {
const args = [ 'list-jobs', ...this.buildIdArg() ]
const { stdout } = await this.runCommand(this.getRunnerPath(), args)
return stdout
}
// ---------------------------------------------------------------------------
gracefulShutdown () {

View file

@ -359,6 +359,26 @@ sudo -u prunner peertube-runner list-registered
:::
### List jobs
**Runner >= 0.1.0**
To list jobs that are processed by the runner:
::: code-group
```bash [Shell]
peertube-runner list-jobs
peertube-runner list-jobs --include-payload
```
```bash [Systemd]
sudo -u prunner peertube-runner list-jobs
sudo -u prunner peertube-runner list-jobs --include-payload
```
:::
### Graceful shutdown
Ask the runner to shutdown when it has finished all of its current tasks:
@ -424,7 +444,7 @@ docker compose exec -u peertube peertube npm run parse-log -- --level info
`--level` is optional and could be `info`/`warn`/`error`
You can also remove SQL or HTTP logs using `--not-tags` (PeerTube >= 3.2):
You can also remove SQL or HTTP logs using `--not-tags`:
::: code-group