From 24516aa26a6753517b379cf7b5104c1a24eccad6 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 21 Jan 2021 15:58:17 +0100 Subject: [PATCH] Refactor transcoding job handlers --- scripts/create-transcoding-job.ts | 6 +- server/helpers/video.ts | 4 +- .../job-queue/handlers/video-file-import.ts | 4 +- .../job-queue/handlers/video-live-ending.ts | 4 +- .../job-queue/handlers/video-transcoding.ts | 205 ++++++++++++------ server/lib/video-transcoding.ts | 25 ++- server/models/video/video.ts | 1 + shared/models/server/job.model.ts | 10 +- 8 files changed, 165 insertions(+), 94 deletions(-) diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts index 2eed53f42..ca9e2a99a 100755 --- a/scripts/create-transcoding-job.ts +++ b/scripts/create-transcoding-job.ts @@ -47,7 +47,7 @@ async function run () { for (const resolution of resolutionsEnabled) { dataInput.push({ - type: 'hls', + type: 'new-resolution-to-hls', videoUUID: video.uuid, resolution, isPortraitMode: false, @@ -56,14 +56,14 @@ async function run () { } } else if (program.resolution !== undefined) { dataInput.push({ - type: 'new-resolution' as 'new-resolution', + type: 'new-resolution-to-webtorrent', videoUUID: video.uuid, isNewVideo: false, resolution: program.resolution }) } else { dataInput.push({ - type: 'optimize' as 'optimize', + type: 'optimize-to-webtorrent', videoUUID: video.uuid, isNewVideo: false }) diff --git a/server/helpers/video.ts b/server/helpers/video.ts index 5d1cd7de1..bfd5a9627 100644 --- a/server/helpers/video.ts +++ b/server/helpers/video.ts @@ -74,14 +74,14 @@ function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile) { if (videoFile.isAudio()) { dataInput = { - type: 'merge-audio' as 'merge-audio', + type: 'merge-audio-to-webtorrent', resolution: DEFAULT_AUDIO_RESOLUTION, videoUUID: video.uuid, isNewVideo: true } } else { dataInput = { - type: 'optimize' as 'optimize', + type: 'optimize-to-webtorrent', videoUUID: video.uuid, isNewVideo: true } diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 18823ee9c..22e4d0cf1 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -9,7 +9,7 @@ import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprob import { logger } from '../../../helpers/logger' import { VideoModel } from '../../../models/video/video' import { VideoFileModel } from '../../../models/video/video-file' -import { publishNewResolutionIfNeeded } from './video-transcoding' +import { onNewWebTorrentFileResolution } from './video-transcoding' async function processVideoFileImport (job: Bull.Job) { const payload = job.data as VideoFileImportPayload @@ -24,7 +24,7 @@ async function processVideoFileImport (job: Bull.Job) { await updateVideoFile(video, payload.filePath) - await publishNewResolutionIfNeeded(video) + await onNewWebTorrentFileResolution(video) return video } diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 8018e2277..db6cd3682 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -7,7 +7,7 @@ import { LiveManager } from '@server/lib/live-manager' import { generateVideoMiniature } from '@server/lib/thumbnail' import { publishAndFederateIfNeeded } from '@server/lib/video' import { getHLSDirectory } from '@server/lib/video-paths' -import { generateHlsPlaylistFromTS } from '@server/lib/video-transcoding' +import { generateHlsPlaylistResolutionFromTS } from '@server/lib/video-transcoding' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' @@ -102,7 +102,7 @@ async function saveLive (video: MVideo, live: MVideoLive) { const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) - const outputPath = await generateHlsPlaylistFromTS({ + const outputPath = await generateHlsPlaylistResolutionFromTS({ video: videoWithFiles, concatenatedTsFilePath, resolution: videoFileResolution, diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 083cec11a..0f6b3f753 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,8 +1,10 @@ import * as Bull from 'bull' +import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' import { publishAndFederateIfNeeded } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' import { + HLSTranscodingPayload, MergeAudioTranscodingPayload, NewResolutionTranscodingPayload, OptimizeTranscodingPayload, @@ -16,9 +18,31 @@ import { sequelizeTypescript } from '../../../initializers/database' import { VideoModel } from '../../../models/video/video' import { federateVideoIfNeeded } from '../../activitypub/videos' import { Notifier } from '../../notifier' -import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' +import { + generateHlsPlaylistResolution, + mergeAudioVideofile, + optimizeOriginalVideofile, + transcodeNewWebTorrentResolution +} from '../../video-transcoding' import { JobQueue } from '../job-queue' -import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' + +const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise } = { + // Deprecated, introduced in 3.1 + 'hls': handleHLSJob, + 'new-resolution-to-hls': handleHLSJob, + + // Deprecated, introduced in 3.1 + 'new-resolution': handleNewWebTorrentResolutionJob, + 'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob, + + // Deprecated, introduced in 3.1 + 'merge-audio': handleWebTorrentMergeAudioJob, + 'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob, + + // Deprecated, introduced in 3.1 + 'optimize': handleWebTorrentOptimizeJob, + 'optimize-to-webtorrent': handleWebTorrentOptimizeJob +} async function processVideoTranscoding (job: Bull.Job) { const payload = job.data as VideoTranscodingPayload @@ -31,42 +55,62 @@ async function processVideoTranscoding (job: Bull.Job) { return undefined } - if (payload.type === 'hls') { - const videoFileInput = payload.copyCodecs - ? video.getWebTorrentFile(payload.resolution) - : video.getMaxQualityFile() + const handler = handlers[payload.type] - const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() - const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) - - await generateHlsPlaylist({ - video, - videoInputPath, - resolution: payload.resolution, - copyCodecs: payload.copyCodecs, - isPortraitMode: payload.isPortraitMode || false, - job - }) - - await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) - } else if (payload.type === 'new-resolution') { - await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false, job) - - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else if (payload.type === 'merge-audio') { - await mergeAudioVideofile(video, payload.resolution, job) - - await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload) - } else { - const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) - - await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload, transcodeType) + if (!handler) { + throw new Error('Cannot find transcoding handler for ' + payload.type) } + await handler(job, payload, video) + return video } -async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { +// --------------------------------------------------------------------------- +// Job handlers +// --------------------------------------------------------------------------- + +async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { + const videoFileInput = payload.copyCodecs + ? video.getWebTorrentFile(payload.resolution) + : video.getMaxQualityFile() + + const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() + const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) + + await generateHlsPlaylistResolution({ + video, + videoInputPath, + resolution: payload.resolution, + copyCodecs: payload.copyCodecs, + isPortraitMode: payload.isPortraitMode || false, + job + }) + + await retryTransactionWrapper(onHlsPlaylistGeneration, video) +} + +async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { + await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) + + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) +} + +async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { + await mergeAudioVideofile(video, payload.resolution, job) + + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) +} + +async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { + const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) + + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) +} + +// --------------------------------------------------------------------------- + +async function onHlsPlaylistGeneration (video: MVideoFullLight) { if (video === undefined) return undefined // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it @@ -82,13 +126,7 @@ async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { return publishAndFederateIfNeeded(video) } -async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { - await publishAndFederateIfNeeded(video) - - createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) -} - -async function onVideoFileOptimizerSuccess ( +async function onVideoFileOptimizer ( videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload, transcodeType: TranscodeOptionsType @@ -113,7 +151,7 @@ async function onVideoFileOptimizerSuccess ( let videoPublished = false - // Generate HLS version of the max quality file + // Generate HLS version of the original file const originalFileHLSPayload = Object.assign({}, payload, { isPortraitMode, resolution: videoDatabase.getMaxQualityFile().resolution, @@ -122,36 +160,11 @@ async function onVideoFileOptimizerSuccess ( }) createHlsJobIfEnabled(originalFileHLSPayload) - if (resolutionsEnabled.length !== 0) { - for (const resolution of resolutionsEnabled) { - let dataInput: VideoTranscodingPayload + const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) - if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { - dataInput = { - type: 'new-resolution' as 'new-resolution', - videoUUID: videoDatabase.uuid, - resolution, - isPortraitMode - } - } else if (CONFIG.TRANSCODING.HLS.ENABLED) { - dataInput = { - type: 'hls', - videoUUID: videoDatabase.uuid, - resolution, - isPortraitMode, - copyCodecs: false - } - } - - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) - } - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } else { + if (!hasNewResolutions) { // No transcoding to do, it's now published videoPublished = await videoDatabase.publishIfNeededAndSave(t) - - logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) } await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) @@ -163,11 +176,20 @@ async function onVideoFileOptimizerSuccess ( if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) } +async function onNewWebTorrentFileResolution ( + video: MVideoUUID, + payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload +) { + await publishAndFederateIfNeeded(video) + + createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) +} + // --------------------------------------------------------------------------- export { processVideoTranscoding, - publishNewResolutionIfNeeded + onNewWebTorrentFileResolution } // --------------------------------------------------------------------------- @@ -175,8 +197,8 @@ export { function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { // Generate HLS playlist? if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload = { - type: 'hls' as 'hls', + const hlsTranscodingPayload: HLSTranscodingPayload = { + type: 'new-resolution-to-hls', videoUUID: payload.videoUUID, resolution: payload.resolution, isPortraitMode: payload.isPortraitMode, @@ -186,3 +208,46 @@ function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) } } + +function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') + logger.info( + 'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length === 0) { + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) + + return false + } + + for (const resolution of resolutionsEnabled) { + let dataInput: VideoTranscodingPayload + + if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { + // WebTorrent will create subsequent HLS job + dataInput = { + type: 'new-resolution-to-webtorrent', + videoUUID: video.uuid, + resolution, + isPortraitMode + } + } else if (CONFIG.TRANSCODING.HLS.ENABLED) { + dataInput = { + type: 'new-resolution-to-hls', + videoUUID: video.uuid, + resolution, + isPortraitMode, + copyCodecs: false + } + } + + JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) + } + + logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) + + return true +} diff --git a/server/lib/video-transcoding.ts b/server/lib/video-transcoding.ts index beef78b44..7af7a481c 100644 --- a/server/lib/video-transcoding.ts +++ b/server/lib/video-transcoding.ts @@ -60,7 +60,7 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFile: const videoOutputPath = getVideoFilePath(video, inputVideoFile) - await onVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) + await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) return transcodeType } catch (err) { @@ -72,7 +72,7 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFile: } // Transcode the original video file to a lower resolution. -async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean, job: Job) { +async function transcodeNewWebTorrentResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean, job: Job) { const transcodeDirectory = CONFIG.STORAGE.TMP_DIR const extname = '.mp4' @@ -118,7 +118,7 @@ async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoR await transcode(transcodeOptions) - return onVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, videoOutputPath) + return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, videoOutputPath) } // Merge an image with an audio file to create a video @@ -170,11 +170,11 @@ async function mergeAudioVideofile (video: MVideoWithAllFiles, resolution: Video video.duration = await getDurationFromVideoFile(videoTranscodedPath) await video.save() - return onVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) + return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) } // Concat TS segments from a live video to a fragmented mp4 HLS playlist -async function generateHlsPlaylistFromTS (options: { +async function generateHlsPlaylistResolutionFromTS (options: { video: MVideoWithFile concatenatedTsFilePath: string resolution: VideoResolution @@ -192,7 +192,7 @@ async function generateHlsPlaylistFromTS (options: { } // Generate an HLS playlist from an input file, and update the master playlist -function generateHlsPlaylist (options: { +function generateHlsPlaylistResolution (options: { video: MVideoWithFile videoInputPath: string resolution: VideoResolution @@ -224,17 +224,22 @@ function getEnabledResolutions (type: 'vod' | 'live') { // --------------------------------------------------------------------------- export { - generateHlsPlaylist, - generateHlsPlaylistFromTS, + generateHlsPlaylistResolution, + generateHlsPlaylistResolutionFromTS, optimizeOriginalVideofile, - transcodeNewResolution, + transcodeNewWebTorrentResolution, mergeAudioVideofile, getEnabledResolutions } // --------------------------------------------------------------------------- -async function onVideoFileTranscoding (video: MVideoWithFile, videoFile: MVideoFile, transcodingPath: string, outputPath: string) { +async function onWebTorrentVideoFileTranscoding ( + video: MVideoWithFile, + videoFile: MVideoFile, + transcodingPath: string, + outputPath: string +) { const stats = await stat(transcodingPath) const fps = await getVideoFileFPS(transcodingPath) const metadata = await getMetadataFromFile(transcodingPath) diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 3db6549ae..2bfa704ec 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -1735,6 +1735,7 @@ export class VideoModel extends Model { } getQualityFileBy (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { + // We first transcode to WebTorrent format, so try this array first if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) { const file = fun(this.VideoFiles, file => file.resolution) diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 11d90c32f..d16ac1032 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -100,26 +100,26 @@ interface BaseTranscodingPayload { isNewVideo?: boolean } -interface HLSTranscodingPayload extends BaseTranscodingPayload { - type: 'hls' +export interface HLSTranscodingPayload extends BaseTranscodingPayload { + type: 'new-resolution-to-hls' isPortraitMode?: boolean resolution: VideoResolution copyCodecs: boolean } export interface NewResolutionTranscodingPayload extends BaseTranscodingPayload { - type: 'new-resolution' + type: 'new-resolution-to-webtorrent' isPortraitMode?: boolean resolution: VideoResolution } export interface MergeAudioTranscodingPayload extends BaseTranscodingPayload { - type: 'merge-audio' + type: 'merge-audio-to-webtorrent' resolution: VideoResolution } export interface OptimizeTranscodingPayload extends BaseTranscodingPayload { - type: 'optimize' + type: 'optimize-to-webtorrent' } export type VideoTranscodingPayload =