1
0
Fork 0

Add ability to save replay of permanent lives

This commit is contained in:
Chocobozzz 2022-04-21 09:06:52 +02:00
parent 2024a3b933
commit 4ec52d04dc
No known key found for this signature in database
GPG key ID: 583A612D890159BE
20 changed files with 426 additions and 175 deletions

View file

@ -1,25 +1,33 @@
import { Job } from 'bull'
import { pathExists, readdir, remove } from 'fs-extra'
import { join } from 'path'
import { ffprobePromise, getAudioStream, getVideoStreamDuration, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
import { VIDEO_LIVE } from '@server/initializers/constants'
import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveDirectory } from '@server/lib/paths'
import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
import { cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
import {
generateHLSMasterPlaylistFilename,
generateHlsSha256SegmentsFilename,
getLiveDirectory,
getLiveReplayBaseDirectory
} from '@server/lib/paths'
import { generateVideoMiniature } from '@server/lib/thumbnail'
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { moveToNextState } from '@server/lib/video-state'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
import { MVideo, MVideoLive, MVideoWithAllFiles } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
async function processVideoLiveEnding (job: Job) {
const payload = job.data as VideoLiveEndingPayload
logger.info('Processing video live ending for %s.', payload.videoId, { payload })
function logError () {
logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
}
@ -32,19 +40,19 @@ async function processVideoLiveEnding (job: Job) {
return
}
const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
if (!streamingPlaylist) {
logError()
return
}
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
if (live.saveReplay !== true) {
return cleanupLive(video, streamingPlaylist)
return cleanupLiveAndFederate(video)
}
return saveLive(video, live, streamingPlaylist)
if (live.permanentLive) {
await saveReplayToExternalVideo(video, payload.publishedAt, payload.replayDirectory)
return cleanupLiveAndFederate(video)
}
return replaceLiveByReplay(video, live, payload.replayDirectory)
}
// ---------------------------------------------------------------------------
@ -55,22 +63,66 @@ export {
// ---------------------------------------------------------------------------
async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) {
const replayDirectory = VideoPathManager.Instance.getFSHLSOutputPath(video, VIDEO_LIVE.REPLAY_DIRECTORY)
async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string, replayDirectory: string) {
await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
const rootFiles = await readdir(getLiveDirectory(video))
const video = new VideoModel({
name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
isLive: false,
state: VideoState.TO_TRANSCODE,
duration: 0,
const playlistFiles = rootFiles.filter(file => {
return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename
})
remote: liveVideo.remote,
category: liveVideo.category,
licence: liveVideo.licence,
language: liveVideo.language,
commentsEnabled: liveVideo.commentsEnabled,
downloadEnabled: liveVideo.downloadEnabled,
waitTranscoding: liveVideo.waitTranscoding,
nsfw: liveVideo.nsfw,
description: liveVideo.description,
support: liveVideo.support,
privacy: liveVideo.privacy,
channelId: liveVideo.channelId
}) as MVideoWithAllFiles
video.Thumbnails = []
video.VideoFiles = []
video.VideoStreamingPlaylists = []
video.url = getLocalVideoActivityPubUrl(video)
await video.save()
// If live is blacklisted, also blacklist the replay
const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id)
if (blacklist) {
await VideoBlacklistModel.create({
videoId: video.id,
unfederated: blacklist.unfederated,
reason: blacklist.reason,
type: blacklist.type
})
}
await assignReplaysToVideo(video, replayDirectory)
await remove(replayDirectory)
for (const type of [ ThumbnailType.MINIATURE, ThumbnailType.PREVIEW ]) {
const image = await generateVideoMiniature({ video, videoFile: video.getMaxQualityFile(), type })
await video.addAndSaveThumbnail(image)
}
await moveToNextState({ video, isNewVideo: true })
}
async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirectory: string) {
await cleanupTMPLiveFiles(getLiveDirectory(video))
await live.destroy()
video.isLive = false
// Reinit views
video.views = 0
video.state = VideoState.TO_TRANSCODE
await video.save()
@ -87,10 +139,38 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename()
await hlsPlaylist.save()
await assignReplaysToVideo(videoWithFiles, replayDirectory)
await remove(getLiveReplayBaseDirectory(videoWithFiles))
// Regenerate the thumbnail & preview?
if (videoWithFiles.getMiniature().automaticallyGenerated === true) {
const miniature = await generateVideoMiniature({
video: videoWithFiles,
videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.MINIATURE
})
await video.addAndSaveThumbnail(miniature)
}
if (videoWithFiles.getPreview().automaticallyGenerated === true) {
const preview = await generateVideoMiniature({
video: videoWithFiles,
videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.PREVIEW
})
await video.addAndSaveThumbnail(preview)
}
await moveToNextState({ video: videoWithFiles, isNewVideo: false })
}
async function assignReplaysToVideo (video: MVideo, replayDirectory: string) {
let durationDone = false
for (const playlistFile of playlistFiles) {
const concatenatedTsFile = buildConcatenatedName(playlistFile)
const concatenatedTsFiles = await readdir(replayDirectory)
for (const concatenatedTsFile of concatenatedTsFiles) {
const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
const probe = await ffprobePromise(concatenatedTsFilePath)
@ -99,7 +179,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
const { resolution, isPortraitMode } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
const { resolutionPlaylistPath: outputPath } = await generateHlsPlaylistResolutionFromTS({
video: videoWithFiles,
video,
concatenatedTsFilePath,
resolution,
isPortraitMode,
@ -107,33 +187,22 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
})
if (!durationDone) {
videoWithFiles.duration = await getVideoStreamDuration(outputPath)
await videoWithFiles.save()
video.duration = await getVideoStreamDuration(outputPath)
await video.save()
durationDone = true
}
}
await remove(replayDirectory)
return video
}
// Regenerate the thumbnail & preview?
if (videoWithFiles.getMiniature().automaticallyGenerated === true) {
await generateVideoMiniature({
video: videoWithFiles,
videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.MINIATURE
})
}
async function cleanupLiveAndFederate (video: MVideo) {
const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
await cleanupLive(video, streamingPlaylist)
if (videoWithFiles.getPreview().automaticallyGenerated === true) {
await generateVideoMiniature({
video: videoWithFiles,
videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.PREVIEW
})
}
await moveToNextState({ video: videoWithFiles, isNewVideo: false })
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
return federateVideoIfNeeded(fullVideo, false, undefined)
}
async function cleanupTMPLiveFiles (hlsDirectory: string) {

View file

@ -1,6 +1,7 @@
import { readFile } from 'fs-extra'
import { readdir, readFile } from 'fs-extra'
import { createServer, Server } from 'net'
import { join } from 'path'
import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
import {
computeLowerResolutionsToTranscode,
@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models'
import { wait } from '@shared/core-utils'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { JobQueue } from '../job-queue'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
import { PeerTubeSocket } from '../peertube-socket'
import { LiveQuotaStore } from './live-quota-store'
import { LiveSegmentShaStore } from './live-segment-sha-store'
@ -322,7 +324,7 @@ class LiveManager {
muxingSession.destroy()
return this.onAfterMuxingCleanup(videoId)
return this.onAfterMuxingCleanup({ videoId })
.catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
})
@ -349,12 +351,15 @@ class LiveManager {
live.Video = video
setTimeout(() => {
federateVideoIfNeeded(video, false)
.catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }))
await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
try {
await federateVideoIfNeeded(video, false)
} catch (err) {
logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
}
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
} catch (err) {
logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
}
@ -364,25 +369,32 @@ class LiveManager {
this.videoSessions.delete(videoId)
}
private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) {
private async onAfterMuxingCleanup (options: {
videoId: number | string
cleanupNow?: boolean // Default false
}) {
const { videoId, cleanupNow = false } = options
try {
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
if (!fullVideo) return
const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
if (!live.permanentLive) {
JobQueue.Instance.createJob({
type: 'video-live-ending',
payload: {
videoId: fullVideo.id
}
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
JobQueue.Instance.createJob({
type: 'video-live-ending',
payload: {
videoId: fullVideo.id,
replayDirectory: live.saveReplay
? await this.findReplayDirectory(fullVideo)
: undefined,
publishedAt: fullVideo.publishedAt.toISOString()
}
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
fullVideo.state = VideoState.LIVE_ENDED
} else {
fullVideo.state = VideoState.WAITING_FOR_LIVE
}
fullVideo.state = live.permanentLive
? VideoState.WAITING_FOR_LIVE
: VideoState.LIVE_ENDED
await fullVideo.save()
@ -390,7 +402,7 @@ class LiveManager {
await federateVideoIfNeeded(fullVideo, false)
} catch (err) {
logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') })
}
}
@ -398,10 +410,19 @@ class LiveManager {
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
for (const uuid of videoUUIDs) {
await this.onAfterMuxingCleanup(uuid, true)
await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true })
}
}
private async findReplayDirectory (video: MVideo) {
const directory = getLiveReplayBaseDirectory(video)
const files = await readdir(directory)
if (files.length === 0) return undefined
return join(directory, files.sort().reverse()[0])
}
private buildAllResolutionsToTranscode (originResolution: number) {
const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
? computeLowerResolutionsToTranscode(originResolution, 'live')

View file

@ -9,12 +9,12 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
return 'concat-' + num[1] + '.ts'
}
async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await remove(hlsDirectory)
await streamingPlaylist.destroy()
if (streamingPlaylist) await streamingPlaylist.destroy()
}
export {

View file

@ -11,7 +11,7 @@ import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
import { getLiveDirectory } from '../../paths'
import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
import { LiveQuotaStore } from '../live-quota-store'
@ -63,6 +63,9 @@ class MuxingSession extends EventEmitter {
private readonly videoUUID: string
private readonly saveReplay: boolean
private readonly outDirectory: string
private readonly replayDirectory: string
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
@ -110,19 +113,22 @@ class MuxingSession extends EventEmitter {
this.saveReplay = this.videoLive.saveReplay
this.outDirectory = getLiveDirectory(this.videoLive.Video)
this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
async runMuxing () {
this.createFiles()
const outPath = await this.prepareDirectories()
await this.prepareDirectories()
this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
? await getLiveTranscodingCommand({
inputUrl: this.inputUrl,
outPath,
outPath: this.outDirectory,
masterPlaylistName: this.streamingPlaylist.playlistFilename,
latencyMode: this.videoLive.latencyMode,
@ -137,15 +143,15 @@ class MuxingSession extends EventEmitter {
})
: getLiveMuxingCommand({
inputUrl: this.inputUrl,
outPath,
outPath: this.outDirectory,
masterPlaylistName: this.streamingPlaylist.playlistFilename,
latencyMode: this.videoLive.latencyMode
})
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
this.watchTSFiles(outPath)
this.watchMasterFile(outPath)
this.watchTSFiles(this.outDirectory)
this.watchMasterFile(this.outDirectory)
let ffmpegShellCommand: string
this.ffmpegCommand.on('start', cmdline => {
@ -155,10 +161,10 @@ class MuxingSession extends EventEmitter {
})
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand })
this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand })
})
this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath))
this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory))
this.ffmpegCommand.run()
}
@ -304,16 +310,11 @@ class MuxingSession extends EventEmitter {
}
private async prepareDirectories () {
const outPath = getLiveDirectory(this.videoLive.Video)
await ensureDir(outPath)
const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
await ensureDir(this.outDirectory)
if (this.videoLive.saveReplay === true) {
await ensureDir(replayDirectory)
await ensureDir(this.replayDirectory)
}
return outPath
}
private isDurationConstraintValid (streamingStartTime: number) {
@ -364,7 +365,7 @@ class MuxingSession extends EventEmitter {
private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
const segmentName = basename(segmentPath)
const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName))
const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
try {
const data = await readFile(segmentPath)

View file

@ -1,6 +1,6 @@
import { join } from 'path'
import { CONFIG } from '@server/initializers/config'
import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY } from '@server/initializers/constants'
import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY, VIDEO_LIVE } from '@server/initializers/constants'
import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoUUID } from '@server/types/models'
import { removeFragmentedMP4Ext } from '@shared/core-utils'
import { buildUUID } from '@shared/extra-utils'
@ -21,6 +21,10 @@ function getLiveDirectory (video: MVideoUUID) {
return getHLSDirectory(video)
}
function getLiveReplayBaseDirectory (video: MVideoUUID) {
return join(getLiveDirectory(video), VIDEO_LIVE.REPLAY_DIRECTORY)
}
function getHLSDirectory (video: MVideoUUID) {
return join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid)
}
@ -74,6 +78,7 @@ export {
getHLSDirectory,
getLiveDirectory,
getLiveReplayBaseDirectory,
getHLSRedundancyDirectory,
generateHLSMasterPlaylistFilename,

View file

@ -3,13 +3,13 @@ import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
import { basename, extname as extnameUtil, join } from 'path'
import { toEven } from '@server/helpers/core-utils'
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
import { MStreamingPlaylistFilesVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
import { MStreamingPlaylistFilesVideo, MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
import { VideoResolution, VideoStorage } from '../../../shared/models/videos'
import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
import {
buildFileMetadata,
canDoQuickTranscode,
getVideoStreamDuration,
buildFileMetadata,
getVideoStreamFPS,
transcodeVOD,
TranscodeVODOptions,
@ -191,7 +191,7 @@ function mergeAudioVideofile (video: MVideoFullLight, resolution: VideoResolutio
// Concat TS segments from a live video to a fragmented mp4 HLS playlist
async function generateHlsPlaylistResolutionFromTS (options: {
video: MVideoFullLight
video: MVideo
concatenatedTsFilePath: string
resolution: VideoResolution
isPortraitMode: boolean
@ -209,7 +209,7 @@ async function generateHlsPlaylistResolutionFromTS (options: {
// Generate an HLS playlist from an input file, and update the master playlist
function generateHlsPlaylistResolution (options: {
video: MVideoFullLight
video: MVideo
videoInputPath: string
resolution: VideoResolution
copyCodecs: boolean
@ -265,7 +265,7 @@ async function onWebTorrentVideoFileTranscoding (
async function generateHlsPlaylistCommon (options: {
type: 'hls' | 'hls-from-ts'
video: MVideoFullLight
video: MVideo
inputPath: string
resolution: VideoResolution
copyCodecs?: boolean

View file

@ -73,8 +73,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video
unfederated: options.unfederate === true,
reason: options.reason,
type: VideoBlacklistType.MANUAL
}
)
})
blacklist.Video = videoInstance
if (options.unfederate === true) {

View file

@ -118,12 +118,6 @@ const videoLiveAddValidator = getCommonVideoEditAttributes().concat([
})
}
if (body.permanentLive && body.saveReplay) {
cleanUpReqFiles(req)
return res.fail({ message: 'Cannot set this live as permanent while saving its replay' })
}
const user = res.locals.oauth.token.User
if (!await doesVideoChannelOfAccountExist(body.channelId, user, res)) return cleanUpReqFiles(req)

View file

@ -2,7 +2,7 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, Foreig
import { CONFIG } from '@server/initializers/config'
import { WEBSERVER } from '@server/initializers/constants'
import { MVideoLive, MVideoLiveVideo } from '@server/types/models'
import { LiveVideo, LiveVideoLatencyMode, VideoState } from '@shared/models'
import { LiveVideo, LiveVideoLatencyMode, VideoPrivacy, VideoState } from '@shared/models'
import { AttributesOnly } from '@shared/typescript-utils'
import { VideoModel } from './video'
import { VideoBlacklistModel } from './video-blacklist'

View file

@ -212,12 +212,6 @@ describe('Test video lives API validator', function () {
await makeUploadRequest({ url: server.url, path, token: server.accessToken, fields, attaches })
})
it('Should fail with save replay and permanent live set to true', async function () {
const fields = { ...baseCorrectParams, saveReplay: true, permanentLive: true }
await makePostBodyRequest({ url: server.url, path, token: server.accessToken, fields })
})
it('Should fail with bad latency setting', async function () {
const fields = { ...baseCorrectParams, latencyMode: 42 }

View file

@ -14,7 +14,7 @@ import {
setDefaultVideoChannel,
waitJobs
} from '@shared/server-commands'
import { checkLiveCleanupAfterSave } from '../../shared'
import { checkLiveCleanup } from '../../shared'
const expect = chai.expect
@ -43,7 +43,7 @@ describe('Test live constraints', function () {
expect(video.duration).to.be.greaterThan(0)
}
await checkLiveCleanupAfterSave(servers[0], videoId, resolutions)
await checkLiveCleanup(servers[0], videoId, resolutions)
}
async function waitUntilLivePublishedOnAllServers (videoId: string) {

View file

@ -121,7 +121,7 @@ describe('Permanent live', function () {
await waitJobs(servers)
})
it('Should not have cleaned up this live', async function () {
it('Should have cleaned up this live', async function () {
this.timeout(40000)
await wait(5000)
@ -129,7 +129,8 @@ describe('Permanent live', function () {
for (const server of servers) {
const videoDetails = await server.videos.get({ id: videoUUID })
expect(videoDetails.streamingPlaylists).to.have.lengthOf(1)
expect(videoDetails.streamingPlaylists).to.have.lengthOf(0)
}
})

View file

@ -3,7 +3,7 @@
import 'mocha'
import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { checkLiveCleanupAfterSave } from '@server/tests/shared'
import { checkLiveCleanup } from '@server/tests/shared'
import { wait } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoPrivacy, VideoState } from '@shared/models'
import {
@ -11,6 +11,7 @@ import {
ConfigCommand,
createMultipleServers,
doubleFollow,
findExternalSavedVideo,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
@ -18,7 +19,8 @@ import {
testFfmpegStreamError,
waitJobs,
waitUntilLivePublishedOnAllServers,
waitUntilLiveSavedOnAllServers
waitUntilLiveReplacedByReplayOnAllServers,
waitUntilLiveWaitingOnAllServers
} from '@shared/server-commands'
const expect = chai.expect
@ -28,7 +30,7 @@ describe('Save replay setting', function () {
let liveVideoUUID: string
let ffmpegCommand: FfmpegCommand
async function createLiveWrapper (saveReplay: boolean) {
async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
if (liveVideoUUID) {
try {
await servers[0].videos.remove({ id: liveVideoUUID })
@ -40,7 +42,8 @@ describe('Save replay setting', function () {
channelId: servers[0].store.channel.id,
privacy: VideoPrivacy.PUBLIC,
name: 'my super live',
saveReplay
saveReplay: options.replay,
permanentLive: options.permanent
}
const { uuid } = await servers[0].live.create({ fields: attributes })
@ -104,7 +107,7 @@ describe('Save replay setting', function () {
it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000)
liveVideoUUID = await createLiveWrapper(false)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
await waitJobs(servers)
@ -140,13 +143,13 @@ describe('Save replay setting', function () {
await checkVideoState(liveVideoUUID, VideoState.LIVE_ENDED)
// No resolutions saved since we did not save replay
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on blacklist and delete the live', async function () {
this.timeout(40000)
liveVideoUUID = await createLiveWrapper(false)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
@ -169,13 +172,13 @@ describe('Save replay setting', function () {
await wait(5000)
await waitJobs(servers)
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on delete and delete the video', async function () {
this.timeout(40000)
liveVideoUUID = await createLiveWrapper(false)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
@ -193,16 +196,16 @@ describe('Save replay setting', function () {
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
})
describe('With save replay enabled', function () {
describe('With save replay enabled on non permanent live', function () {
it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000)
liveVideoUUID = await createLiveWrapper(true)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
await waitJobs(servers)
@ -227,7 +230,7 @@ describe('Save replay setting', function () {
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveSavedOnAllServers(servers, liveVideoUUID)
await waitUntilLiveReplacedByReplayOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
// Live has been transcoded
@ -249,13 +252,13 @@ describe('Save replay setting', function () {
})
it('Should have cleaned up the live files', async function () {
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ])
await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ])
})
it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () {
this.timeout(40000)
liveVideoUUID = await createLiveWrapper(true)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
@ -277,13 +280,13 @@ describe('Save replay setting', function () {
await wait(5000)
await waitJobs(servers)
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ])
await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ])
})
it('Should correctly terminate the stream on delete and delete the video', async function () {
this.timeout(40000)
liveVideoUUID = await createLiveWrapper(true)
liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
@ -300,7 +303,123 @@ describe('Save replay setting', function () {
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
})
describe('With save replay enabled on permanent live', function () {
let lastReplayUUID: string
it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000)
liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.OK_200)
await checkVideoState(liveVideoUUID, VideoState.WAITING_FOR_LIVE)
})
it('Should correctly have updated the live and federated it when streaming in the live', async function () {
this.timeout(20000)
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
await checkVideoState(liveVideoUUID, VideoState.PUBLISHED)
})
it('Should correctly have saved the live and federated it after the streaming', async function () {
this.timeout(30000)
const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
const video = await findExternalSavedVideo(servers[0], liveDetails)
expect(video).to.exist
for (const server of servers) {
await server.videos.get({ id: video.uuid })
}
lastReplayUUID = video.uuid
})
it('Should have cleaned up the live files', async function () {
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () {
this.timeout(60000)
await servers[0].videos.remove({ id: lastReplayUUID })
liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
await Promise.all([
servers[0].blacklist.add({ videoId: liveVideoUUID, reason: 'bad live', unfederate: true }),
testFfmpegStreamError(ffmpegCommand, true)
])
await waitJobs(servers)
await wait(5000)
await waitJobs(servers)
const replay = await findExternalSavedVideo(servers[0], liveDetails)
expect(replay).to.exist
for (const videoId of [ liveVideoUUID, replay.uuid ]) {
await checkVideosExist(videoId, false)
await servers[0].videos.get({ id: videoId, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 })
await servers[1].videos.get({ id: videoId, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
}
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on delete and not save the video', async function () {
this.timeout(40000)
liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
await Promise.all([
servers[0].videos.remove({ id: liveVideoUUID }),
testFfmpegStreamError(ffmpegCommand, true)
])
await wait(5000)
await waitJobs(servers)
const replay = await findExternalSavedVideo(servers[0], liveDetails)
expect(replay).to.not.exist
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
})

View file

@ -4,7 +4,7 @@ import 'mocha'
import * as chai from 'chai'
import { basename, join } from 'path'
import { ffprobePromise, getVideoStream } from '@server/helpers/ffmpeg'
import { checkLiveCleanupAfterSave, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
import { checkLiveCleanup, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
import { wait } from '@shared/core-utils'
import {
HttpStatusCode,
@ -583,7 +583,7 @@ describe('Test live', function () {
it('Should correctly have cleaned up the live files', async function () {
this.timeout(30000)
await checkLiveCleanupAfterSave(servers[0], liveVideoId, [ 240, 360, 720 ])
await checkLiveCleanup(servers[0], liveVideoId, [ 240, 360, 720 ])
})
})

View file

@ -2,13 +2,13 @@
import 'mocha'
import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { expectStartWith } from '@server/tests/shared'
import { areObjectStorageTestsDisabled } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models'
import {
createMultipleServers,
doubleFollow,
findExternalSavedVideo,
killallServers,
makeRawRequest,
ObjectStorageCommand,
@ -18,17 +18,19 @@ import {
stopFfmpeg,
waitJobs,
waitUntilLivePublishedOnAllServers,
waitUntilLiveSavedOnAllServers
waitUntilLiveReplacedByReplayOnAllServers,
waitUntilLiveWaitingOnAllServers
} from '@shared/server-commands'
const expect = chai.expect
async function createLive (server: PeerTubeServer) {
async function createLive (server: PeerTubeServer, permanent: boolean) {
const attributes: LiveVideoCreate = {
channelId: server.store.channel.id,
privacy: VideoPrivacy.PUBLIC,
name: 'my super live',
saveReplay: true
saveReplay: true,
permanentLive: permanent
}
const { uuid } = await server.live.create({ fields: attributes })
@ -44,12 +46,39 @@ async function checkFiles (files: VideoFile[]) {
}
}
async function getFiles (server: PeerTubeServer, videoUUID: string) {
const video = await server.videos.get({ id: videoUUID })
expect(video.files).to.have.lengthOf(0)
expect(video.streamingPlaylists).to.have.lengthOf(1)
return video.streamingPlaylists[0].files
}
async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) {
const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID })
await waitUntilLivePublishedOnAllServers(servers, liveUUID)
const videoLiveDetails = await servers[0].videos.get({ id: liveUUID })
const liveDetails = await servers[0].live.get({ videoId: liveUUID })
await stopFfmpeg(ffmpegCommand)
if (liveDetails.permanentLive) {
await waitUntilLiveWaitingOnAllServers(servers, liveUUID)
} else {
await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID)
}
await waitJobs(servers)
return { videoLiveDetails, liveDetails }
}
describe('Object storage for lives', function () {
if (areObjectStorageTestsDisabled()) return
let ffmpegCommand: FfmpegCommand
let servers: PeerTubeServer[]
let videoUUID: string
before(async function () {
this.timeout(120000)
@ -66,31 +95,22 @@ describe('Object storage for lives', function () {
})
describe('Without live transcoding', async function () {
let videoUUID: string
before(async function () {
await servers[0].config.enableLive({ transcoding: false })
videoUUID = await createLive(servers[0])
videoUUID = await createLive(servers[0], false)
})
it('Should create a live and save the replay on object storage', async function () {
this.timeout(220000)
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
await waitUntilLivePublishedOnAllServers(servers, videoUUID)
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveSavedOnAllServers(servers, videoUUID)
await waitJobs(servers)
await streamAndEnd(servers, videoUUID)
for (const server of servers) {
const video = await server.videos.get({ id: videoUUID })
expect(video.files).to.have.lengthOf(0)
expect(video.streamingPlaylists).to.have.lengthOf(1)
const files = video.streamingPlaylists[0].files
const files = await getFiles(server, videoUUID)
expect(files).to.have.lengthOf(1)
await checkFiles(files)
}
@ -98,31 +118,38 @@ describe('Object storage for lives', function () {
})
describe('With live transcoding', async function () {
let videoUUIDPermanent: string
let videoUUIDNonPermanent: string
before(async function () {
await servers[0].config.enableLive({ transcoding: true })
videoUUID = await createLive(servers[0])
videoUUIDPermanent = await createLive(servers[0], true)
videoUUIDNonPermanent = await createLive(servers[0], false)
})
it('Should import a video and have sent it to object storage', async function () {
it('Should create a live and save the replay on object storage', async function () {
this.timeout(240000)
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
await waitUntilLivePublishedOnAllServers(servers, videoUUID)
await stopFfmpeg(ffmpegCommand)
await waitUntilLiveSavedOnAllServers(servers, videoUUID)
await waitJobs(servers)
await streamAndEnd(servers, videoUUIDNonPermanent)
for (const server of servers) {
const video = await server.videos.get({ id: videoUUID })
const files = await getFiles(server, videoUUIDNonPermanent)
expect(files).to.have.lengthOf(5)
expect(video.files).to.have.lengthOf(0)
expect(video.streamingPlaylists).to.have.lengthOf(1)
await checkFiles(files)
}
})
const files = video.streamingPlaylists[0].files
it('Should create a live and save the replay of permanent live on object storage', async function () {
this.timeout(240000)
const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent)
const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
for (const server of servers) {
const files = await getFiles(server, replay.uuid)
expect(files).to.have.lengthOf(5)
await checkFiles(files)

View file

@ -5,11 +5,11 @@ import { pathExists, readdir } from 'fs-extra'
import { join } from 'path'
import { PeerTubeServer } from '@shared/server-commands'
async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: string, resolutions: number[] = []) {
async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) {
const basePath = server.servers.buildDirectory('streaming-playlists')
const hlsPath = join(basePath, 'hls', videoUUID)
if (resolutions.length === 0) {
if (savedResolutions.length === 0) {
const result = await pathExists(hlsPath)
expect(result).to.be.false
@ -19,9 +19,9 @@ async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: str
const files = await readdir(hlsPath)
// fragmented file and playlist per resolution + master playlist + segments sha256 json file
expect(files).to.have.lengthOf(resolutions.length * 2 + 2)
expect(files).to.have.lengthOf(savedResolutions.length * 2 + 2)
for (const resolution of resolutions) {
for (const resolution of savedResolutions) {
const fragmentedFile = files.find(f => f.endsWith(`-${resolution}-fragmented.mp4`))
expect(fragmentedFile).to.exist
@ -37,5 +37,5 @@ async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: str
}
export {
checkLiveCleanupAfterSave
checkLiveCleanup
}

View file

@ -159,6 +159,9 @@ export type VideoTranscodingPayload =
export interface VideoLiveEndingPayload {
videoId: number
publishedAt: string
replayDirectory?: string
}
export interface ActorKeysPayload {

View file

@ -1,8 +1,9 @@
import { LiveVideoLatencyMode } from '.'
import { VideoCreate } from '../video-create.model'
import { LiveVideoLatencyMode } from './live-video-latency-mode.enum'
export interface LiveVideoCreate extends VideoCreate {
saveReplay?: boolean
permanentLive?: boolean
latencyMode?: LiveVideoLatencyMode
saveReplay?: boolean
}

View file

@ -117,7 +117,7 @@ export class LiveCommand extends AbstractCommand {
return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
}
async waitUntilSaved (options: OverrideCommandOptions & {
async waitUntilReplacedByReplay (options: OverrideCommandOptions & {
videoId: number | string
}) {
let video: VideoDetails

View file

@ -1,6 +1,7 @@
import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
import { buildAbsoluteFixturePath, wait } from '@shared/core-utils'
import { PeerTubeServer } from '../server/server'
import { VideoDetails, VideoInclude } from '@shared/models'
function sendRTMPStream (options: {
rtmpBaseUrl: string
@ -84,17 +85,33 @@ async function waitUntilLivePublishedOnAllServers (servers: PeerTubeServer[], vi
}
}
async function waitUntilLiveSavedOnAllServers (servers: PeerTubeServer[], videoId: string) {
async function waitUntilLiveWaitingOnAllServers (servers: PeerTubeServer[], videoId: string) {
for (const server of servers) {
await server.live.waitUntilSaved({ videoId })
await server.live.waitUntilWaiting({ videoId })
}
}
async function waitUntilLiveReplacedByReplayOnAllServers (servers: PeerTubeServer[], videoId: string) {
for (const server of servers) {
await server.live.waitUntilReplacedByReplay({ videoId })
}
}
async function findExternalSavedVideo (server: PeerTubeServer, liveDetails: VideoDetails) {
const { data } = await server.videos.list({ token: server.accessToken, sort: '-publishedAt', include: VideoInclude.BLACKLISTED })
return data.find(v => v.name === liveDetails.name + ' - ' + new Date(liveDetails.publishedAt).toLocaleString())
}
export {
sendRTMPStream,
waitFfmpegUntilError,
testFfmpegStreamError,
stopFfmpeg,
waitUntilLivePublishedOnAllServers,
waitUntilLiveSavedOnAllServers
waitUntilLiveReplacedByReplayOnAllServers,
waitUntilLiveWaitingOnAllServers,
findExternalSavedVideo
}