diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index a78a66d78..84a515857 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -644,6 +644,7 @@ const VIDEO_LIVE = { SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist REPLAY_DIRECTORY: 'replay', EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4, + MAX_SOCKET_WAITING_DATA: 1024 * 1000 * 100, // 100MB RTMP: { CHUNK_SIZE: 60000, GOP_CACHE: true, @@ -656,7 +657,8 @@ const VIDEO_LIVE = { const MEMOIZE_TTL = { OVERVIEWS_SAMPLE: 1000 * 3600 * 4, // 4 hours INFO_HASH_EXISTS: 1000 * 3600 * 12, // 12 hours - LIVE_ABLE_TO_UPLOAD: 1000 * 60 // 1 minute + LIVE_ABLE_TO_UPLOAD: 1000 * 60, // 1 minute + LIVE_CHECK_SOCKET_HEALTH: 1000 * 60 // 1 minute } const MEMOIZE_LENGTH = { diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 7f5fdf899..d968f05da 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -3,6 +3,7 @@ import * as Bluebird from 'bluebird' import * as chokidar from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' +import { createServer, Server } from 'net' import { basename, join } from 'path' import { isTestInstance } from '@server/helpers/core-utils' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' @@ -27,8 +28,7 @@ import { getHLSDirectory } from './video-paths' import { availableEncoders } from './video-transcoding-profiles' import memoizee = require('memoizee') - -const NodeRtmpServer = require('node-media-server/node_rtmp_server') +const NodeRtmpSession = require('node-media-server/node_rtmp_session') const context = require('node-media-server/node_core_ctx') const nodeMediaServerLogger = require('node-media-server/node_core_logger') @@ -63,7 +63,11 @@ class LiveManager { return isAbleToUploadVideo(userId, 1000) }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) - private rtmpServer: any + private readonly hasClientSocketsInBadHealthWithCache = memoizee((sessionId: string) => { + return this.hasClientSocketsInBadHealth(sessionId) + }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH }) + + private rtmpServer: Server private constructor () { } @@ -108,19 +112,31 @@ class LiveManager { run () { logger.info('Running RTMP server on port %d', config.rtmp.port) - this.rtmpServer = new NodeRtmpServer(config) - this.rtmpServer.tcpServer.on('error', err => { + this.rtmpServer = createServer(socket => { + const session = new NodeRtmpSession(config, socket) + + session.run() + }) + + this.rtmpServer.on('error', err => { logger.error('Cannot run RTMP server.', { err }) }) - this.rtmpServer.run() + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) } stop () { logger.info('Stopping RTMP server.') - this.rtmpServer.stop() + this.rtmpServer.close() this.rtmpServer = undefined + + // Sessions is an object + this.getContext().sessions.forEach((session: any) => { + if (session instanceof NodeRtmpSession) { + session.stop() + } + }) } isRunning () { @@ -344,11 +360,21 @@ class LiveManager { segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] + if (this.hasClientSocketsInBadHealthWithCache(sessionId)) { + logger.error( + 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + + ' Stopping session of video %s.', videoUUID) + + this.stopSessionOf(videoLive.videoId) + return + } + // Duration constraint check if (this.isDurationConstraintValid(startStreamDateTime) !== true) { logger.info('Stopping session of %s: max duration exceeded.', videoUUID) this.stopSessionOf(videoLive.videoId) + return } // Check user quota if the user enabled replay saving @@ -517,6 +543,30 @@ class LiveManager { return now <= max } + private hasClientSocketsInBadHealth (sessionId: string) { + const rtmpSession = this.getContext().sessions.get(sessionId) + + if (!rtmpSession) { + logger.warn('Cannot get session %s to check players socket health.', sessionId) + return + } + + for (const playerSessionId of rtmpSession.players) { + const playerSession = this.getContext().sessions.get(playerSessionId) + + if (!playerSession) { + logger.error('Cannot get player session %s to check socket health.', playerSession) + continue + } + + if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) { + return true + } + } + + return false + } + private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) { if (live.saveReplay !== true) return true