diff --git a/client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.ts b/client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.ts index d81f577b2..80e5a73da 100644 --- a/client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.ts +++ b/client/src/app/+videos/+video-edit/video-add-components/video-go-live.component.ts @@ -7,7 +7,7 @@ import { FormValidatorService } from '@app/shared/shared-forms' import { Video, VideoCaptionService, VideoEdit, VideoService } from '@app/shared/shared-main' import { LiveVideoService } from '@app/shared/shared-video-live' import { LoadingBarService } from '@ngx-loading-bar/core' -import { LiveVideo, LiveVideoCreate, LiveVideoUpdate, PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' +import { LiveVideo, LiveVideoCreate, LiveVideoLatencyMode, LiveVideoUpdate, PeerTubeProblemDocument, ServerErrorCode } from '@shared/models' import { VideoSend } from './video-send' @Component({ @@ -71,12 +71,13 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, AfterView nsfw: this.serverConfig.instance.isNSFW, waitTranscoding: true, permanentLive: this.firstStepPermanentLive, + latencyMode: LiveVideoLatencyMode.DEFAULT, saveReplay: this.isReplayAllowed(), channelId: this.firstStepChannelId } // Go live in private mode, but correctly fill the update form with the first user choice - const toPatch = Object.assign({}, video, { privacy: this.firstStepPrivacyId }) + const toPatch = { ...video, privacy: this.firstStepPrivacyId } this.form.patchValue(toPatch) this.liveVideoService.goLive(video) @@ -121,6 +122,7 @@ export class VideoGoLiveComponent extends VideoSend implements OnInit, AfterView const liveVideoUpdate: LiveVideoUpdate = { saveReplay: this.form.value.saveReplay, + latencyMode: this.form.value.latencyMode, permanentLive: this.form.value.permanentLive } diff --git a/client/src/assets/player/shared/common/utils.ts b/client/src/assets/player/shared/common/utils.ts index da7dda0c7..a010d9184 100644 --- a/client/src/assets/player/shared/common/utils.ts +++ b/client/src/assets/player/shared/common/utils.ts @@ -4,17 +4,15 @@ function toTitleCase (str: string) { return str.charAt(0).toUpperCase() + str.slice(1) } -// https://github.com/danrevah/ngx-pipes/blob/master/src/pipes/math/bytes.ts -// Don't import all Angular stuff, just copy the code with shame -const dictionaryBytes: Array<{max: number, type: string}> = [ - { max: 1024, type: 'B' }, - { max: 1048576, type: 'KB' }, - { max: 1073741824, type: 'MB' }, - { max: 1.0995116e12, type: 'GB' } +const dictionaryBytes = [ + { max: 1024, type: 'B', decimals: 0 }, + { max: 1048576, type: 'KB', decimals: 0 }, + { max: 1073741824, type: 'MB', decimals: 0 }, + { max: 1.0995116e12, type: 'GB', decimals: 1 } ] function bytes (value: number) { const format = dictionaryBytes.find(d => value < d.max) || dictionaryBytes[dictionaryBytes.length - 1] - const calc = Math.floor(value / (format.max / 1024)).toString() + const calc = (value / (format.max / 1024)).toFixed(format.decimals) return [ calc, format.type ] } diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 0d7e7077d..5329d75f8 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -861,7 +861,7 @@ if (isTestInstance() === true && process.env.PRODUCTION_CONSTANTS !== 'true') { PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 - VIDEO_LIVE.CLEANUP_DELAY = 5000 + VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000 VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2 VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1 VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1 @@ -1169,3 +1169,9 @@ function buildLanguages () { function generateContentHash () { return randomBytes(20).toString('hex') } + +function getIntEnv (path: string) { + if (process.env[path]) return parseInt(process.env[path]) + + return undefined +} diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index feec257fc..450bda2fd 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -4,7 +4,7 @@ import { join } from 'path' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' -import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live' +import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, @@ -22,15 +22,17 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' -import { logger } from '../../../helpers/logger' +import { logger, loggerTagsFactory } from '../../../helpers/logger' + +const lTags = loggerTagsFactory('live', 'job') async function processVideoLiveEnding (job: Job) { const payload = job.data as VideoLiveEndingPayload - logger.info('Processing video live ending for %s.', payload.videoId, { payload }) + logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() }) function logError () { - logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) + logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags()) } const liveVideo = await VideoModel.load(payload.videoId) @@ -73,8 +75,6 @@ async function saveReplayToExternalVideo (options: { }) { const { liveVideo, liveSession, publishedAt, replayDirectory } = options - await cleanupTMPLiveFiles(getLiveDirectory(liveVideo)) - const video = new VideoModel({ name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`, isLive: false, @@ -243,7 +243,7 @@ async function cleanupLiveAndFederate (options: { if (live.permanentLive) { await cleanupPermanentLive(video, streamingPlaylist) } else { - await cleanupNormalLive(video, streamingPlaylist) + await cleanupUnsavedNormalLive(video, streamingPlaylist) } } diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 6365e23db..6305a97a8 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -10,20 +10,20 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { return 'concat-' + num[1] + '.ts' } -async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { +async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) await cleanupTMPLiveFiles(hlsDirectory) - if (streamingPlaylist) await streamingPlaylist.destroy() + await streamingPlaylist.destroy() } -async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { +async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) await remove(hlsDirectory) - if (streamingPlaylist) await streamingPlaylist.destroy() + await streamingPlaylist.destroy() } async function cleanupTMPLiveFiles (hlsDirectory: string) { @@ -49,7 +49,7 @@ async function cleanupTMPLiveFiles (hlsDirectory: string) { export { cleanupPermanentLive, - cleanupNormalLive, + cleanupUnsavedNormalLive, cleanupTMPLiveFiles, buildConcatenatedName } diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts index 71bc150d8..c88943f65 100644 --- a/server/tests/api/live/index.ts +++ b/server/tests/api/live/index.ts @@ -1,4 +1,5 @@ import './live-constraints' +import './live-fast-restream' import './live-socket-messages' import './live-permanent' import './live-rtmps' diff --git a/server/tests/api/live/live-fast-restream.ts b/server/tests/api/live/live-fast-restream.ts new file mode 100644 index 000000000..4b5d041ec --- /dev/null +++ b/server/tests/api/live/live-fast-restream.ts @@ -0,0 +1,128 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { wait } from '@shared/core-utils' +import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models' +import { + cleanupTests, + createSingleServer, + makeRawRequest, + PeerTubeServer, + setAccessTokensToServers, + setDefaultVideoChannel, + stopFfmpeg, + waitJobs +} from '@shared/server-commands' + +const expect = chai.expect + +describe('Fast restream in live', function () { + let server: PeerTubeServer + + async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) { + const attributes: LiveVideoCreate = { + channelId: server.store.channel.id, + privacy: VideoPrivacy.PUBLIC, + name: 'my super live', + saveReplay: options.replay, + permanentLive: options.permanent + } + + const { uuid } = await server.live.create({ fields: attributes }) + return uuid + } + + async function fastRestreamWrapper ({ replay }: { replay: boolean }) { + const liveVideoUUID = await createLiveWrapper({ permanent: true, replay }) + await waitJobs([ server ]) + + const rtmpOptions = { + videoId: liveVideoUUID, + copyCodecs: true, + fixtureName: 'video_short.mp4' + } + + // Streaming session #1 + let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions) + await server.live.waitUntilPublished({ videoId: liveVideoUUID }) + await stopFfmpeg(ffmpegCommand) + await server.live.waitUntilWaiting({ videoId: liveVideoUUID }) + + // Streaming session #2 + ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions) + await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 }) + + return { ffmpegCommand, liveVideoUUID } + } + + async function ensureLastLiveWorks (liveId: string) { + // Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY + for (let i = 0; i < 100; i++) { + const video = await server.videos.get({ id: liveId }) + expect(video.streamingPlaylists).to.have.lengthOf(1) + + await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 }) + await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200) + + await wait(100) + } + } + + async function runTest (replay: boolean) { + const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay }) + + await ensureLastLiveWorks(liveVideoUUID) + + await stopFfmpeg(ffmpegCommand) + await server.live.waitUntilWaiting({ videoId: liveVideoUUID }) + + // Wait for replays + await waitJobs([ server ]) + + const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID }) + + expect(total).to.equal(2) + expect(sessions).to.have.lengthOf(2) + + for (const session of sessions) { + expect(session.error).to.be.null + + if (replay) { + expect(session.replayVideo).to.exist + + await server.videos.get({ id: session.replayVideo.uuid }) + } else { + expect(session.replayVideo).to.not.exist + } + } + } + + before(async function () { + this.timeout(120000) + + const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' } + server = await createSingleServer(1, {}, { env }) + + // Get the access tokens + await setAccessTokensToServers([ server ]) + await setDefaultVideoChannel([ server ]) + + await server.config.enableMinimumTranscoding(false, true) + await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' }) + }) + + it('Should correctly fast reastream in a permanent live with and without save replay', async function () { + this.timeout(240000) + + // A test can take a long time, so prefer to run them in parallel + await Promise.all([ + runTest(true), + runTest(false) + ]) + }) + + after(async function () { + await cleanupTests([ server ]) + }) +}) diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts index 99d500711..7ddcb04ef 100644 --- a/server/tests/api/live/live-save-replay.ts +++ b/server/tests/api/live/live-save-replay.ts @@ -12,7 +12,6 @@ import { createMultipleServers, doubleFollow, findExternalSavedVideo, - makeRawRequest, PeerTubeServer, setAccessTokensToServers, setDefaultVideoChannel, @@ -442,46 +441,6 @@ describe('Save replay setting', function () { await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) await checkLiveCleanup(servers[0], liveVideoUUID, []) }) - - it('Should correctly save replays with multiple sessions', async function () { - this.timeout(120000) - - liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true }) - await waitJobs(servers) - - // Streaming session #1 - ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) - await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) - await stopFfmpeg(ffmpegCommand) - await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID }) - - // Streaming session #2 - ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) - await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) - - await wait(5000) - const video = await servers[0].videos.get({ id: liveVideoUUID }) - expect(video.streamingPlaylists).to.have.lengthOf(1) - await makeRawRequest(video.streamingPlaylists[0].playlistUrl) - - await stopFfmpeg(ffmpegCommand) - await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID) - - // Wait for replays - await waitJobs(servers) - - const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID }) - - expect(total).to.equal(2) - expect(sessions).to.have.lengthOf(2) - - for (const session of sessions) { - expect(session.error).to.be.null - expect(session.replayVideo).to.exist - - await servers[0].videos.get({ id: session.replayVideo.uuid }) - } - }) }) after(async function () { diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index b9caf394d..2d47c131b 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -395,7 +395,7 @@ describe('Test live', function () { for (let i = 0; i < resolutions.length; i++) { const segmentNum = 3 const segmentName = `${i}-00000${segmentNum}.ts` - await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum }) + await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum }) const subPlaylist = await servers[0].streamingPlaylists.get({ url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8` @@ -628,9 +628,9 @@ describe('Test live', function () { commands[0].waitUntilPublished({ videoId: liveVideoReplayId }) ]) - await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 }) - await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 }) - await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 }) + await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 }) + await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 }) + await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 }) { const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId }) diff --git a/shared/server-commands/server/config-command.ts b/shared/server-commands/server/config-command.ts index 5320dead4..3803aaf95 100644 --- a/shared/server-commands/server/config-command.ts +++ b/shared/server-commands/server/config-command.ts @@ -39,15 +39,18 @@ export class ConfigCommand extends AbstractCommand { enableLive (options: { allowReplay?: boolean transcoding?: boolean + resolutions?: 'min' | 'max' // Default max } = {}) { + const { allowReplay, transcoding, resolutions = 'max' } = options + return this.updateExistingSubConfig({ newConfig: { live: { enabled: true, - allowReplay: options.allowReplay ?? true, + allowReplay: allowReplay ?? true, transcoding: { - enabled: options.transcoding ?? true, - resolutions: ConfigCommand.getCustomConfigResolutions(true) + enabled: transcoding ?? true, + resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max') } } } diff --git a/shared/server-commands/videos/live-command.ts b/shared/server-commands/videos/live-command.ts index 2ff65881b..3df47ed4d 100644 --- a/shared/server-commands/videos/live-command.ts +++ b/shared/server-commands/videos/live-command.ts @@ -154,13 +154,33 @@ export class LiveCommand extends AbstractCommand { waitUntilSegmentGeneration (options: OverrideCommandOptions & { videoUUID: string - resolution: number + playlistNumber: number + segment: number + totalSessions?: number + }) { + const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options + const segmentName = `${playlistNumber}-00000${segment}.ts` + + return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false) + } + + getSegment (options: OverrideCommandOptions & { + videoUUID: string + playlistNumber: number segment: number }) { - const { resolution, segment, videoUUID } = options - const segmentName = `${resolution}-00000${segment}.ts` + const { playlistNumber, segment, videoUUID } = options - return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false) + const segmentName = `${playlistNumber}-00000${segment}.ts` + const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}` + + return this.getRawRequest({ + ...options, + + url, + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) } async waitUntilReplacedByReplay (options: OverrideCommandOptions & {