diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 8eff4bd2f..cb1533dc6 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts @@ -170,18 +170,29 @@ export class RunnerServer { private async checkAvailableJobs () { if (this.checkingAvailableJobs) return - logger.info('Checking available jobs') - this.checkingAvailableJobs = true + let hadAvailableJob = false + for (const server of this.servers) { try { + logger.info('Checking available jobs on ' + server.url) + const job = await this.requestJob(server) if (!job) continue + hadAvailableJob = true + await this.tryToExecuteJobAsync(server, job) } catch (err) { - if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { + const code = (err.res?.body as PeerTubeProblemDocument)?.code + + if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { + logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') + return + } + + if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) await this.unregisterRunner({ url: server.url }) @@ -193,6 +204,11 @@ export class RunnerServer { } this.checkingAvailableJobs = false + + if (hadAvailableJob && this.canProcessMoreJobs()) { + this.checkAvailableJobs() + .catch(err => logger.error({ err }, 'Cannot check more available jobs')) + } } private async requestJob (server: PeerTubeServer) { @@ -211,7 +227,7 @@ export class RunnerServer { } private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { - if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return + if (!this.canProcessMoreJobs()) return const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) @@ -242,6 +258,10 @@ export class RunnerServer { return ConfigManager.Instance.setRegisteredInstances(data) } + private canProcessMoreJobs () { + return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency + } + // --------------------------------------------------------------------------- private async cleanupTMP () { diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts index bdeb0c6cd..140f062be 100644 --- a/server/controllers/api/runners/jobs.ts +++ b/server/controllers/api/runners/jobs.ts @@ -42,6 +42,7 @@ import { RunnerJobType, RunnerJobUpdateBody, RunnerJobUpdatePayload, + ServerErrorCode, UserRight, VideoStudioTranscodingSuccess, VODAudioMergeTranscodingSuccess, @@ -168,6 +169,7 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) { if (runnerJob.state !== RunnerJobState.PENDING) { res.fail({ + type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE, message: 'This job is not in pending state anymore', status: HttpStatusCode.CONFLICT_409 }) diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts index 3c2cf51b7..5bad34860 100644 --- a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts @@ -68,6 +68,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { abort () { if (this.ended || this.errored || this.aborted) return + logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags()) + this.ffmpegCommand.kill('SIGINT') this.aborted = true @@ -95,6 +97,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { private onFFmpegEnded () { if (this.ended || this.errored || this.aborted) return + logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags()) + this.ended = true this.emit('end') } diff --git a/server/tests/api/server/open-telemetry.ts b/server/tests/api/server/open-telemetry.ts index 49f3b520b..fd85fc514 100644 --- a/server/tests/api/server/open-telemetry.ts +++ b/server/tests/api/server/open-telemetry.ts @@ -31,7 +31,7 @@ describe('Open Telemetry', function () { it('Should enable open telemetry metrics', async function () { this.timeout(120000) - server = await createSingleServer(1, { + await server.run({ open_telemetry: { metrics: { enabled: true @@ -73,7 +73,7 @@ describe('Open Telemetry', function () { it('Should disable http request duration metrics', async function () { await server.kill() - server = await createSingleServer(1, { + await server.run({ open_telemetry: { metrics: { enabled: true, @@ -114,7 +114,7 @@ describe('Open Telemetry', function () { }) it('Should enable open telemetry metrics', async function () { - server = await createSingleServer(1, { + await server.run({ open_telemetry: { tracing: { enabled: true, diff --git a/server/tests/peertube-runner/vod-transcoding.ts b/server/tests/peertube-runner/vod-transcoding.ts index 3c0918102..d7e2df095 100644 --- a/server/tests/peertube-runner/vod-transcoding.ts +++ b/server/tests/peertube-runner/vod-transcoding.ts @@ -189,7 +189,7 @@ describe('Test VOD transcoding in peertube-runner program', function () { }) it('Should transcode videos on manual run', async function () { - this.timeout(120000) + this.timeout(240000) await servers[0].config.disableTranscoding() diff --git a/shared/models/server/server-error-code.enum.ts b/shared/models/server/server-error-code.enum.ts index 24d3c6d21..2b093380c 100644 --- a/shared/models/server/server-error-code.enum.ts +++ b/shared/models/server/server-error-code.enum.ts @@ -48,6 +48,7 @@ export const enum ServerErrorCode { ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected', RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state', + RUNNER_JOB_NOT_IN_PENDING_STATE = 'runner_job_not_in_pending_state', UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token' }