More robust chunk handler
This commit is contained in:
parent
6403a6bd01
commit
def4ea4f38
3 changed files with 48 additions and 21 deletions
|
@ -21,6 +21,9 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
private readonly outputPath: string
|
private readonly outputPath: string
|
||||||
private readonly fsWatchers: FSWatcher[] = []
|
private readonly fsWatchers: FSWatcher[] = []
|
||||||
|
|
||||||
|
// Playlist name -> chunks
|
||||||
|
private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
|
||||||
|
|
||||||
private readonly playlistsCreated = new Set<string>()
|
private readonly playlistsCreated = new Set<string>()
|
||||||
private allPlaylistsCreated = false
|
private allPlaylistsCreated = false
|
||||||
|
|
||||||
|
@ -68,9 +71,19 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
tsWatcher.on('add', p => {
|
tsWatcher.on('add', async p => {
|
||||||
this.sendAddedChunkUpdate(p)
|
try {
|
||||||
.catch(err => this.onUpdateError(err, rej))
|
await this.sendPendingChunks()
|
||||||
|
} catch (err) {
|
||||||
|
this.onUpdateError(err, rej)
|
||||||
|
}
|
||||||
|
|
||||||
|
const playlistName = this.getPlaylistIdFromTS(p)
|
||||||
|
|
||||||
|
const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
|
||||||
|
pendingChunks.push(p)
|
||||||
|
|
||||||
|
this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
|
||||||
})
|
})
|
||||||
|
|
||||||
tsWatcher.on('unlink', p => {
|
tsWatcher.on('unlink', p => {
|
||||||
|
@ -230,31 +243,38 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
return this.updateWithRetry(payload)
|
return this.updateWithRetry(payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
private sendAddedChunkUpdate (addedChunk: string): Promise<any> {
|
private async sendPendingChunks (): Promise<any> {
|
||||||
if (this.ended) return Promise.resolve()
|
if (this.ended) return Promise.resolve()
|
||||||
|
|
||||||
logger.debug(`Sending added live chunk ${addedChunk} update`)
|
for (const playlist of this.pendingChunksPerPlaylist.keys()) {
|
||||||
|
for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
|
||||||
|
logger.debug(`Sending added live chunk ${chunk} update`)
|
||||||
|
|
||||||
const videoChunkFilename = basename(addedChunk)
|
const videoChunkFilename = basename(chunk)
|
||||||
|
|
||||||
let payload: LiveRTMPHLSTranscodingUpdatePayload = {
|
let payload: LiveRTMPHLSTranscodingUpdatePayload = {
|
||||||
type: 'add-chunk',
|
type: 'add-chunk',
|
||||||
videoChunkFilename,
|
videoChunkFilename,
|
||||||
videoChunkFile: addedChunk
|
videoChunkFile: chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.allPlaylistsCreated) {
|
if (this.allPlaylistsCreated) {
|
||||||
const playlistName = this.getPlaylistName(videoChunkFilename)
|
const playlistName = this.getPlaylistName(videoChunkFilename)
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
...payload,
|
...payload,
|
||||||
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
|
masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
|
||||||
resolutionPlaylistFilename: playlistName,
|
resolutionPlaylistFilename: playlistName,
|
||||||
resolutionPlaylistFile: join(this.outputPath, playlistName)
|
resolutionPlaylistFile: join(this.outputPath, playlistName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.updateWithRetry(payload)
|
||||||
|
.catch(err => logger.error({ err }, 'Cannot update with retry'))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return this.updateWithRetry(payload)
|
this.pendingChunksPerPlaylist.set(playlist, [])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
|
private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
|
||||||
|
@ -281,6 +301,12 @@ export class ProcessLiveRTMPHLSTranscoding {
|
||||||
return `${videoChunkFilename.split('-')[0]}.m3u8`
|
return `${videoChunkFilename.split('-')[0]}.m3u8`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getPlaylistIdFromTS (segmentPath: string) {
|
||||||
|
const playlistIdMatcher = /^([\d+])-/
|
||||||
|
|
||||||
|
return basename(segmentPath).match(playlistIdMatcher)[1]
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
private cleanup () {
|
private cleanup () {
|
||||||
|
|
|
@ -103,7 +103,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
|
||||||
await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
|
await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(
|
logger.debug(
|
||||||
'Runner live RTMP to HLS job %s for %s updated.',
|
'Runner live RTMP to HLS job %s for %s updated.',
|
||||||
runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
|
runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
|
||||||
)
|
)
|
||||||
|
|
|
@ -373,6 +373,7 @@ peertube-runner [commands] --id instance-2
|
||||||
peertube-runner [commands] --id instance-3
|
peertube-runner [commands] --id instance-3
|
||||||
```
|
```
|
||||||
|
|
||||||
|
You can change the runner configuration (ffmpeg threads, ffmpeg nice etc) by editing `~/.config/peertube-runner-nodejs/[id]/config.toml`.
|
||||||
|
|
||||||
### Run the server
|
### Run the server
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue