diff --git a/apps/peertube-runner/src/server/process/shared/process-live.ts b/apps/peertube-runner/src/server/process/shared/process-live.ts index 0dc4e5b13..421803dc1 100644 --- a/apps/peertube-runner/src/server/process/shared/process-live.ts +++ b/apps/peertube-runner/src/server/process/shared/process-live.ts @@ -150,7 +150,7 @@ export class ProcessLiveRTMPHLSTranscoding { const type = ((err as any).res?.body as PeerTubeProblemDocument)?.code if (type === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { - logger.info({ err }, 'Stopping transcoding as the job is not in processing state anymore') + logger.info('Stopping transcoding as the job is not in processing state anymore') res() } else { diff --git a/scripts/dev/peertube-runner.sh b/scripts/dev/peertube-runner.sh index 7bd756123..fb21fc4aa 100755 --- a/scripts/dev/peertube-runner.sh +++ b/scripts/dev/peertube-runner.sh @@ -6,6 +6,8 @@ rm -rf ./apps/peertube-runner/dist cd ./apps/peertube-runner +../../node_modules/.bin/tsc -b --verbose + ../../node_modules/.bin/concurrently -k \ "../../node_modules/.bin/tsc -w --noEmit" \ "node ./scripts/watch.js" diff --git a/server/core/lib/live/live-manager.ts b/server/core/lib/live/live-manager.ts index 797b3bdfa..21781dd01 100644 --- a/server/core/lib/live/live-manager.ts +++ b/server/core/lib/live/live-manager.ts @@ -1,11 +1,11 @@ -import { readdir, readFile } from 'fs/promises' -import { createServer, Server } from 'net' -import context from 'node-media-server/src/node_core_ctx.js' -import nodeMediaServerLogger from 'node-media-server/src/node_core_logger.js' -import NodeRtmpSession from 'node-media-server/src/node_rtmp_session.js' -import { join } from 'path' -import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { pick, wait } from '@peertube/peertube-core-utils' +import { + ffprobePromise, + getVideoStreamBitrate, + getVideoStreamDimensionsInfo, + getVideoStreamFPS, + hasAudioStream +} from '@peertube/peertube-ffmpeg' import { LiveVideoError, LiveVideoErrorType, VideoState } from '@peertube/peertube-models' import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config.js' @@ -19,15 +19,16 @@ import { VideoLiveModel } from '@server/models/video/video-live.js' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js' import { VideoModel } from '@server/models/video/video.js' import { MUser, MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models/index.js' -import { - ffprobePromise, - getVideoStreamBitrate, - getVideoStreamDimensionsInfo, - getVideoStreamFPS, - hasAudioStream -} from '@peertube/peertube-ffmpeg' +import { readFile, readdir } from 'fs/promises' +import { Server, createServer } from 'net' +import context from 'node-media-server/src/node_core_ctx.js' +import nodeMediaServerLogger from 'node-media-server/src/node_core_logger.js' +import NodeRtmpSession from 'node-media-server/src/node_rtmp_session.js' +import { join } from 'path' +import { Server as ServerTLS, createServer as createServerTLS } from 'tls' import { federateVideoIfNeeded } from '../activitypub/videos/index.js' import { JobQueue } from '../job-queue/index.js' +import { Notifier } from '../notifier/notifier.js' import { getLiveReplayBaseDirectory } from '../paths.js' import { PeerTubeSocket } from '../peertube-socket.js' import { Hooks } from '../plugins/hooks.js' @@ -35,7 +36,6 @@ import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolu import { LiveQuotaStore } from './live-quota-store.js' import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils.js' import { MuxingSession } from './shared/index.js' -import { Notifier } from '../notifier/notifier.js' // Disable node media server logs nodeMediaServerLogger.setLogType(0) @@ -188,12 +188,14 @@ class LiveManager { return this.getContext().sessions.has(sessionId) } - stopSessionOf (options: { + stopSessionOfVideo (options: { videoUUID: string error: LiveVideoErrorType | null + + expectedSessionId?: string // Prevent stopping another session of permanent live errorOnReplay?: boolean }) { - const { videoUUID, error } = options + const { videoUUID, expectedSessionId, error } = options const sessionId = this.videoSessions.get(videoUUID) if (!sessionId) { @@ -201,6 +203,14 @@ class LiveManager { return } + if (expectedSessionId && expectedSessionId !== sessionId) { + logger.debug( + `No live session ${expectedSessionId} to stop for video ${videoUUID} (current session: ${sessionId})`, + lTags(sessionId, videoUUID) + ) + return + } + logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) this.saveEndingSession(options) @@ -354,23 +364,23 @@ class LiveManager { localLTags ) - this.stopSessionOf({ videoUUID, error: LiveVideoError.BAD_SOCKET_HEALTH }) + this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.BAD_SOCKET_HEALTH }) }) muxingSession.on('duration-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf({ videoUUID, error: LiveVideoError.DURATION_EXCEEDED }) + this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.DURATION_EXCEEDED }) }) muxingSession.on('quota-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf({ videoUUID, error: LiveVideoError.QUOTA_EXCEEDED }) + this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.QUOTA_EXCEEDED }) }) muxingSession.on('transcoding-error', ({ videoUUID }) => { - this.stopSessionOf({ videoUUID, error: LiveVideoError.FFMPEG_ERROR }) + this.stopSessionOfVideo({ videoUUID, error: LiveVideoError.FFMPEG_ERROR }) }) muxingSession.on('transcoding-end', ({ videoUUID }) => { @@ -397,7 +407,7 @@ class LiveManager { this.muxingSessions.delete(sessionId) muxingSession.destroy() - this.stopSessionOf({ + this.stopSessionOfVideo({ videoUUID, error: err.liveVideoErrorCode || LiveVideoError.UNKNOWN_ERROR, errorOnReplay: true // Replay cannot be processed as muxing session failed directly diff --git a/server/core/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/core/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts index f35d2f586..e0fd1f78d 100644 --- a/server/core/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts +++ b/server/core/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts @@ -7,6 +7,7 @@ import { RunnerJobStateType } from '@peertube/peertube-models' import { buildUUID } from '@peertube/peertube-node-utils' +import { tryAtomicMove } from '@server/helpers/fs.js' import { logger } from '@server/helpers/logger.js' import { JOB_PRIORITY } from '@server/initializers/constants.js' import { LiveManager } from '@server/lib/live/index.js' @@ -15,7 +16,6 @@ import { MRunnerJob } from '@server/types/models/runners/index.js' import { remove } from 'fs-extra/esm' import { join } from 'path' import { AbstractJobHandler } from './abstract-job-handler.js' -import { tryAtomicMove } from '@server/helpers/fs.js' type CreateOptions = { video: MVideo @@ -165,7 +165,11 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler { logger.info('Stopping live of video %s after video deletion.', instance.uuid) - LiveManager.Instance.stopSessionOf({ videoUUID: instance.uuid, error: null }) + LiveManager.Instance.stopSessionOfVideo({ videoUUID: instance.uuid, error: null }) } @BeforeDestroy