1
0
Fork 0

Add check constraints live tests

This commit is contained in:
Chocobozzz 2020-11-03 15:33:30 +01:00 committed by Chocobozzz
parent af4ae64f6f
commit 97969c4edf
13 changed files with 229 additions and 37 deletions

View file

@ -33,7 +33,8 @@ export class JobsComponent extends RestTable implements OnInit {
'videos-views', 'videos-views',
'activitypub-refresher', 'activitypub-refresher',
'video-live-ending', 'video-live-ending',
'video-redundancy' 'video-redundancy',
'video-live-ending'
] ]
jobs: Job[] = [] jobs: Job[] = []

View file

@ -733,6 +733,7 @@ if (isTestInstance() === true) {
FILES_CACHE.VIDEO_CAPTIONS.MAX_AGE = 3000 FILES_CACHE.VIDEO_CAPTIONS.MAX_AGE = 3000
MEMOIZE_TTL.OVERVIEWS_SAMPLE = 3000 MEMOIZE_TTL.OVERVIEWS_SAMPLE = 3000
MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD = 3000
OVERVIEWS.VIDEOS.SAMPLE_THRESHOLD = 2 OVERVIEWS.VIDEOS.SAMPLE_THRESHOLD = 2
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000

View file

@ -8,9 +8,10 @@ import { generateHlsPlaylist } from '@server/lib/video-transcoding'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' import { MStreamingPlaylist, MVideo, MVideoLive, MVideoWithFile } from '@server/types/models'
import { VideoLiveEndingPayload, VideoState } from '@shared/models' import { VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoFileModel } from '@server/models/video/video-file'
async function processVideoLiveEnding (job: Bull.Job) { async function processVideoLiveEnding (job: Bull.Job) {
const payload = job.data as VideoLiveEndingPayload const payload = job.data as VideoLiveEndingPayload
@ -60,6 +61,10 @@ async function saveLive (video: MVideo, live: MVideoLive) {
const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts')) const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts'))
await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpName) await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpName)
for (const file of segmentFiles) {
await remove(join(hlsDirectory, file))
}
if (!duration) { if (!duration) {
duration = await getDurationFromVideoFile(mp4TmpName) duration = await getDurationFromVideoFile(mp4TmpName)
} }
@ -77,8 +82,13 @@ async function saveLive (video: MVideo, live: MVideoLive) {
await video.save() await video.save()
// Remove old HLS playlist video files
const videoWithFiles = await VideoModel.loadWithFiles(video.id) const videoWithFiles = await VideoModel.loadWithFiles(video.id)
const hlsPlaylist = videoWithFiles.getHLSPlaylist()
await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
hlsPlaylist.VideoFiles = []
for (const resolution of resolutions) { for (const resolution of resolutions) {
const videoInputPath = buildMP4TmpName(resolution) const videoInputPath = buildMP4TmpName(resolution)
const { isPortraitMode } = await getVideoFileResolution(videoInputPath) const { isPortraitMode } = await getVideoFileResolution(videoInputPath)
@ -90,12 +100,11 @@ async function saveLive (video: MVideo, live: MVideoLive) {
copyCodecs: true, copyCodecs: true,
isPortraitMode isPortraitMode
}) })
await remove(join(hlsDirectory, videoInputPath))
} }
video.state = VideoState.PUBLISHED await publishAndFederateIfNeeded(video, true)
await video.save()
await publishAndFederateIfNeeded(video)
} }
async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {

View file

@ -133,10 +133,8 @@ class LiveManager {
const sessionId = this.videoSessions.get(videoId) const sessionId = this.videoSessions.get(videoId)
if (!sessionId) return if (!sessionId) return
this.videoSessions.delete(videoId)
this.abortSession(sessionId) this.abortSession(sessionId)
this.onEndTransmuxing(videoId, true)
.catch(err => logger.error('Cannot end transmuxing of video %d.', videoId, { err }))
} }
private getContext () { private getContext () {
@ -259,9 +257,12 @@ class LiveManager {
updateSegment(segmentPath) updateSegment(segmentPath)
if (this.isDurationConstraintValid(startStreamDateTime) !== true) { if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
this.stopSessionOf(videoLive.videoId) this.stopSessionOf(videoLive.videoId)
} }
// Check user quota if the user enabled replay saving
if (videoLive.saveReplay === true) { if (videoLive.saveReplay === true) {
stat(segmentPath) stat(segmentPath)
.then(segmentStat => { .then(segmentStat => {
@ -270,6 +271,8 @@ class LiveManager {
.then(() => this.isQuotaConstraintValid(user, videoLive)) .then(() => this.isQuotaConstraintValid(user, videoLive))
.then(quotaValid => { .then(quotaValid => {
if (quotaValid !== true) { if (quotaValid !== true) {
logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
this.stopSessionOf(videoLive.videoId) this.stopSessionOf(videoLive.videoId)
} }
}) })
@ -319,7 +322,7 @@ class LiveManager {
onFFmpegEnded() onFFmpegEnded()
// Don't care that we killed the ffmpeg process // Don't care that we killed the ffmpeg process
if (err?.message?.includes('SIGINT')) return if (err?.message?.includes('Exiting normally')) return
logger.error('Live transcoding error.', { err, stdout, stderr }) logger.error('Live transcoding error.', { err, stdout, stderr })
@ -348,8 +351,7 @@ class LiveManager {
} }
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
// FIXME: use end fullVideo.state = VideoState.LIVE_ENDED
fullVideo.state = VideoState.WAITING_FOR_LIVE
await fullVideo.save() await fullVideo.save()
PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)

View file

@ -18,8 +18,6 @@ import { Redis } from './redis'
import { createLocalVideoChannel } from './video-channel' import { createLocalVideoChannel } from './video-channel'
import { createWatchLaterPlaylist } from './video-playlist' import { createWatchLaterPlaylist } from './video-playlist'
import memoizee = require('memoizee')
type ChannelNames = { name: string, displayName: string } type ChannelNames = { name: string, displayName: string }
async function createUserAccountAndChannelAndPlaylist (parameters: { async function createUserAccountAndChannelAndPlaylist (parameters: {
@ -152,8 +150,8 @@ async function isAbleToUploadVideo (userId: number, size: number) {
if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true) if (user.videoQuota === -1 && user.videoQuotaDaily === -1) return Promise.resolve(true)
const [ totalBytes, totalBytesDaily ] = await Promise.all([ const [ totalBytes, totalBytesDaily ] = await Promise.all([
getOriginalVideoFileTotalFromUser(user.id), getOriginalVideoFileTotalFromUser(user),
getOriginalVideoFileTotalDailyFromUser(user.id) getOriginalVideoFileTotalDailyFromUser(user)
]) ])
const uploadedTotal = size + totalBytes const uploadedTotal = size + totalBytes

View file

@ -4,7 +4,7 @@ import { TagModel } from '@server/models/video/tag'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { FilteredModelAttributes } from '@server/types' import { FilteredModelAttributes } from '@server/types'
import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
import { ThumbnailType, VideoCreate, VideoPrivacy } from '@shared/models' import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState } from '@shared/models'
import { federateVideoIfNeeded } from './activitypub/videos' import { federateVideoIfNeeded } from './activitypub/videos'
import { Notifier } from './notifier' import { Notifier } from './notifier'
import { createVideoMiniatureFromExisting } from './thumbnail' import { createVideoMiniatureFromExisting } from './thumbnail'
@ -81,8 +81,8 @@ async function setVideoTags (options: {
} }
} }
async function publishAndFederateIfNeeded (video: MVideoUUID) { async function publishAndFederateIfNeeded (video: MVideoUUID, wasLive = false) {
const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { const result = await sequelizeTypescript.transaction(async t => {
// Maybe the video changed in database, refresh it // Maybe the video changed in database, refresh it
const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
// Video does not exist anymore // Video does not exist anymore
@ -92,14 +92,15 @@ async function publishAndFederateIfNeeded (video: MVideoUUID) {
const videoPublished = await videoDatabase.publishIfNeededAndSave(t) const videoPublished = await videoDatabase.publishIfNeededAndSave(t)
// If the video was not published, we consider it is a new one for other instances // If the video was not published, we consider it is a new one for other instances
await federateVideoIfNeeded(videoDatabase, videoPublished, t) // Live videos are always federated, so it's not a new video
await federateVideoIfNeeded(videoDatabase, !wasLive && videoPublished, t)
return { videoDatabase, videoPublished } return { videoDatabase, videoPublished }
}) })
if (videoPublished) { if (result?.videoPublished) {
Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase) Notifier.Instance.notifyOnNewVideoIfNeeded(result.videoDatabase)
Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(result.videoDatabase)
} }
} }

View file

@ -26,7 +26,6 @@ import {
MUser, MUser,
MUserDefault, MUserDefault,
MUserFormattable, MUserFormattable,
MUserId,
MUserNotifSettingChannelDefault, MUserNotifSettingChannelDefault,
MUserWithNotificationSetting, MUserWithNotificationSetting,
MVideoFullLight MVideoFullLight
@ -68,10 +67,10 @@ import { getSort, throwIfNotValid } from '../utils'
import { VideoModel } from '../video/video' import { VideoModel } from '../video/video'
import { VideoChannelModel } from '../video/video-channel' import { VideoChannelModel } from '../video/video-channel'
import { VideoImportModel } from '../video/video-import' import { VideoImportModel } from '../video/video-import'
import { VideoLiveModel } from '../video/video-live'
import { VideoPlaylistModel } from '../video/video-playlist' import { VideoPlaylistModel } from '../video/video-playlist'
import { AccountModel } from './account' import { AccountModel } from './account'
import { UserNotificationSettingModel } from './user-notification-setting' import { UserNotificationSettingModel } from './user-notification-setting'
import { VideoLiveModel } from '../video/video-live'
enum ScopeNames { enum ScopeNames {
FOR_ME_API = 'FOR_ME_API', FOR_ME_API = 'FOR_ME_API',

View file

@ -311,6 +311,14 @@ export class VideoFileModel extends Model<VideoFileModel> {
return element.save({ transaction }) return element.save({ transaction })
} }
static removeHLSFilesOfVideoId (videoStreamingPlaylistId: number) {
const options = {
where: { videoStreamingPlaylistId }
}
return VideoFileModel.destroy(options)
}
getVideoOrStreamingPlaylist (this: MVideoFileVideo | MVideoFileStreamingPlaylistVideo): MVideo | MStreamingPlaylistVideo { getVideoOrStreamingPlaylist (this: MVideoFileVideo | MVideoFileStreamingPlaylistVideo): MVideo | MStreamingPlaylistVideo {
if (this.videoId) return (this as MVideoFileVideo).Video if (this.videoId) return (this as MVideoFileVideo).Video

View file

@ -249,7 +249,7 @@ export type AvailableForListIDsOptions = {
[ScopeNames.WITH_LIVE]: { [ScopeNames.WITH_LIVE]: {
include: [ include: [
{ {
model: VideoLiveModel, model: VideoLiveModel.unscoped(),
required: false required: false
} }
] ]

View file

@ -18,6 +18,7 @@ import {
ServerInfo, ServerInfo,
setAccessTokensToServers, setAccessTokensToServers,
stopFfmpeg, stopFfmpeg,
testFfmpegStreamError,
updateCustomSubConfig, updateCustomSubConfig,
updateLive, updateLive,
uploadVideoAndGetId, uploadVideoAndGetId,
@ -402,6 +403,21 @@ describe('Test video lives API validator', function () {
await stopFfmpeg(command) await stopFfmpeg(command)
}) })
it('Should fail to stream twice in the save live', async function () {
this.timeout(30000)
const resLive = await getLive(server.url, server.accessToken, videoId)
const live: LiveVideo = resLive.body
const command = sendRTMPStream(live.rtmpUrl, live.streamKey)
await waitUntilLiveStarts(server.url, server.accessToken, videoId)
await testFfmpegStreamError(server.url, server.accessToken, videoId, true)
await stopFfmpeg(command)
})
}) })
after(async function () { after(async function () {

View file

@ -2,14 +2,15 @@
import 'mocha' import 'mocha'
import * as chai from 'chai' import * as chai from 'chai'
import { LiveVideo, LiveVideoCreate, VideoDetails, VideoPrivacy } from '@shared/models' import { LiveVideo, LiveVideoCreate, User, VideoDetails, VideoPrivacy } from '@shared/models'
import { import {
acceptChangeOwnership,
cleanupTests, cleanupTests,
createLive, createLive,
createUser,
doubleFollow, doubleFollow,
flushAndRunMultipleServers, flushAndRunMultipleServers,
getLive, getLive,
getMyUserInformation,
getVideo, getVideo,
getVideosList, getVideosList,
makeRawRequest, makeRawRequest,
@ -17,9 +18,13 @@ import {
ServerInfo, ServerInfo,
setAccessTokensToServers, setAccessTokensToServers,
setDefaultVideoChannel, setDefaultVideoChannel,
testFfmpegStreamError,
testImage, testImage,
updateCustomSubConfig, updateCustomSubConfig,
updateLive, updateLive,
updateUser,
userLogin,
wait,
waitJobs waitJobs
} from '../../../../shared/extra-utils' } from '../../../../shared/extra-utils'
@ -28,6 +33,9 @@ const expect = chai.expect
describe('Test live', function () { describe('Test live', function () {
let servers: ServerInfo[] = [] let servers: ServerInfo[] = []
let liveVideoUUID: string let liveVideoUUID: string
let userId: number
let userAccessToken: string
let userChannelId: number
before(async function () { before(async function () {
this.timeout(120000) this.timeout(120000)
@ -45,6 +53,22 @@ describe('Test live', function () {
} }
}) })
{
const user = { username: 'user1', password: 'superpassword' }
const res = await createUser({
url: servers[0].url,
accessToken: servers[0].accessToken,
username: user.username,
password: user.password
})
userId = res.body.user.id
userAccessToken = await userLogin(servers[0], user)
const resMe = await getMyUserInformation(servers[0].url, userAccessToken)
userChannelId = (resMe.body as User).videoChannels[0].id
}
// Server 1 and server 2 follow each other // Server 1 and server 2 follow each other
await doubleFollow(servers[0], servers[1]) await doubleFollow(servers[0], servers[1])
}) })
@ -198,17 +222,111 @@ describe('Test live', function () {
describe('Test live constraints', function () { describe('Test live constraints', function () {
it('Should not have size limit if save replay is disabled', async function () { async function createLiveWrapper (saveReplay: boolean) {
const liveAttributes = {
name: 'user live',
channelId: userChannelId,
privacy: VideoPrivacy.PUBLIC,
saveReplay
}
const res = await createLive(servers[0].url, userAccessToken, liveAttributes)
return res.body.video.uuid as string
}
before(async function () {
await updateCustomSubConfig(servers[0].url, servers[0].accessToken, {
live: {
enabled: true,
allowReplay: true
}
}) })
it('Should have size limit if save replay is enabled', async function () { await updateUser({
// daily quota + total quota url: servers[0].url,
userId,
accessToken: servers[0].accessToken,
videoQuota: 1,
videoQuotaDaily: -1
})
})
it('Should not have size limit if save replay is disabled', async function () {
this.timeout(30000)
const userVideoLiveoId = await createLiveWrapper(false)
await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false)
})
it('Should have size limit depending on user global quota if save replay is enabled', async function () {
this.timeout(30000)
const userVideoLiveoId = await createLiveWrapper(true)
await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true)
await waitJobs(servers)
for (const server of servers) {
const res = await getVideo(server.url, userVideoLiveoId)
const video: VideoDetails = res.body
expect(video.isLive).to.be.false
expect(video.duration).to.be.greaterThan(0)
}
// TODO: check stream correctly saved + cleaned
})
it('Should have size limit depending on user daily quota if save replay is enabled', async function () {
this.timeout(30000)
await updateUser({
url: servers[0].url,
userId,
accessToken: servers[0].accessToken,
videoQuota: -1,
videoQuotaDaily: 1
})
const userVideoLiveoId = await createLiveWrapper(true)
await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true)
// TODO: check stream correctly saved + cleaned
})
it('Should succeed without quota limit', async function () {
this.timeout(30000)
// Wait for user quota memoize cache invalidation
await wait(5000)
await updateUser({
url: servers[0].url,
userId,
accessToken: servers[0].accessToken,
videoQuota: 10 * 1000 * 1000,
videoQuotaDaily: -1
})
const userVideoLiveoId = await createLiveWrapper(true)
await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, false)
}) })
it('Should have max duration limit', async function () { it('Should have max duration limit', async function () {
this.timeout(30000)
await updateCustomSubConfig(servers[0].url, servers[0].accessToken, {
live: {
enabled: true,
allowReplay: true,
maxDuration: 1
}
})
const userVideoLiveoId = await createLiveWrapper(true)
await testFfmpegStreamError(servers[0].url, userAccessToken, userVideoLiveoId, true)
// TODO: check stream correctly saved + cleaned
}) })
}) })

View file

@ -1,9 +1,9 @@
import * as ffmpeg from 'fluent-ffmpeg' import * as ffmpeg from 'fluent-ffmpeg'
import { LiveVideoCreate, LiveVideoUpdate, VideoDetails, VideoState } from '@shared/models' import { omit } from 'lodash'
import { LiveVideo, LiveVideoCreate, LiveVideoUpdate, VideoDetails, VideoState } from '@shared/models'
import { buildAbsoluteFixturePath, wait } from '../miscs/miscs' import { buildAbsoluteFixturePath, wait } from '../miscs/miscs'
import { makeGetRequest, makePutBodyRequest, makeUploadRequest } from '../requests/requests' import { makeGetRequest, makePutBodyRequest, makeUploadRequest } from '../requests/requests'
import { getVideoWithToken } from './videos' import { getVideoWithToken } from './videos'
import { omit } from 'lodash'
function getLive (url: string, token: string, videoId: number | string, statusCodeExpected = 200) { function getLive (url: string, token: string, videoId: number | string, statusCodeExpected = 200) {
const path = '/api/v1/videos/live' const path = '/api/v1/videos/live'
@ -47,7 +47,14 @@ function createLive (url: string, token: string, fields: LiveVideoCreate, status
}) })
} }
function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) { async function sendRTMPStreamInVideo (url: string, token: string, videoId: number | string, onErrorCb?: Function) {
const res = await getLive(url, token, videoId)
const videoLive = res.body as LiveVideo
return sendRTMPStream(videoLive.rtmpUrl, videoLive.streamKey, onErrorCb)
}
function sendRTMPStream (rtmpBaseUrl: string, streamKey: string, onErrorCb?: Function) {
const fixture = buildAbsoluteFixturePath('video_short.mp4') const fixture = buildAbsoluteFixturePath('video_short.mp4')
const command = ffmpeg(fixture) const command = ffmpeg(fixture)
@ -63,7 +70,7 @@ function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) {
command.on('error', err => { command.on('error', err => {
if (err?.message?.includes('Exiting normally')) return if (err?.message?.includes('Exiting normally')) return
console.error('Cannot send RTMP stream.', { err }) if (onErrorCb) onErrorCb(err)
}) })
if (process.env.DEBUG) { if (process.env.DEBUG) {
@ -75,6 +82,34 @@ function sendRTMPStream (rtmpBaseUrl: string, streamKey: string) {
return command return command
} }
function waitFfmpegUntilError (command: ffmpeg.FfmpegCommand, successAfterMS = 10000) {
return new Promise((res, rej) => {
command.on('error', err => {
return rej(err)
})
setTimeout(() => {
res()
}, successAfterMS)
})
}
async function testFfmpegStreamError (url: string, token: string, videoId: number | string, shouldHaveError: boolean) {
const command = await sendRTMPStreamInVideo(url, token, videoId)
let error: Error
try {
await waitFfmpegUntilError(command, 10000)
} catch (err) {
error = err
}
await stopFfmpeg(command)
if (shouldHaveError && !error) throw new Error('Ffmpeg did not have an error')
if (!shouldHaveError && error) throw error
}
async function stopFfmpeg (command: ffmpeg.FfmpegCommand) { async function stopFfmpeg (command: ffmpeg.FfmpegCommand) {
command.kill('SIGINT') command.kill('SIGINT')
@ -99,6 +134,9 @@ export {
updateLive, updateLive,
waitUntilLiveStarts, waitUntilLiveStarts,
createLive, createLive,
testFfmpegStreamError,
stopFfmpeg, stopFfmpeg,
sendRTMPStreamInVideo,
waitFfmpegUntilError,
sendRTMPStream sendRTMPStream
} }

View file

@ -2,15 +2,16 @@ import { VideoPrivacy } from './video-privacy.enum'
import { VideoScheduleUpdate } from './video-schedule-update.model' import { VideoScheduleUpdate } from './video-schedule-update.model'
export interface VideoCreate { export interface VideoCreate {
name: string
channelId: number
category?: number category?: number
licence?: number licence?: number
language?: string language?: string
description?: string description?: string
support?: string support?: string
channelId: number
nsfw?: boolean nsfw?: boolean
waitTranscoding?: boolean waitTranscoding?: boolean
name: string
tags?: string[] tags?: string[]
commentsEnabled?: boolean commentsEnabled?: boolean
downloadEnabled?: boolean downloadEnabled?: boolean