1
0
Fork 0

Improve viewer counter

More precise, avoid weird decrease, reuse an id to federate viewers
This commit is contained in:
Chocobozzz 2022-04-06 08:50:43 +02:00 committed by Chocobozzz
parent dfbcefc20d
commit ac907dc7c1
17 changed files with 235 additions and 156 deletions

View File

@ -43,7 +43,7 @@ async function runCommand (req: express.Request, res: express.Response) {
const processors: { [id in SendDebugCommand['command']]: () => Promise<any> } = {
'remove-dandling-resumable-uploads': () => RemoveDanglingResumableUploadsScheduler.Instance.execute(),
'process-video-views-buffer': () => VideoViewsBufferScheduler.Instance.execute(),
'process-video-viewers': () => VideoViewsManager.Instance.processViewers()
'process-video-viewers': () => VideoViewsManager.Instance.processViewerStats()
}
await processors[body.command]()

View File

@ -1,8 +1,6 @@
import express from 'express'
import { sendView } from '@server/lib/activitypub/send/send-view'
import { Hooks } from '@server/lib/plugins/hooks'
import { VideoViewsManager } from '@server/lib/views/video-views-manager'
import { getServerActor } from '@server/models/application/application'
import { MVideoId } from '@server/types/models'
import { HttpStatusCode, VideoView } from '@shared/models'
import { asyncMiddleware, methodsValidator, openapiOperationDoc, optionalAuthenticate, videoViewValidator } from '../../../middlewares'
@ -33,7 +31,7 @@ async function viewVideo (req: express.Request, res: express.Response) {
const body = req.body as VideoView
const ip = req.ip
const { successView, successViewer } = await VideoViewsManager.Instance.processLocalView({
const { successView } = await VideoViewsManager.Instance.processLocalView({
video,
ip,
currentTime: body.currentTime,
@ -41,15 +39,9 @@ async function viewVideo (req: express.Request, res: express.Response) {
})
if (successView) {
await sendView({ byActor: await getServerActor(), video, type: 'view' })
Hooks.runAction('action:api.video.viewed', { video: video, ip, req, res })
}
if (successViewer) {
await sendView({ byActor: await getServerActor(), video, type: 'viewer' })
}
await updateUserHistoryIfNeeded(body, video, res)
return res.status(HttpStatusCode.NO_CONTENT_204).end()

View File

@ -367,7 +367,7 @@ const CONSTRAINTS_FIELDS = {
const VIEW_LIFETIME = {
VIEW: CONFIG.VIEWS.VIDEOS.IP_VIEW_EXPIRATION,
VIEWER_COUNTER: 60000 * 5, // 5 minutes
VIEWER_COUNTER: 60000 * 1, // 1 minute
VIEWER_STATS: 60000 * 60 // 1 hour
}

View File

@ -32,7 +32,7 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
? new Date(activity.expires)
: undefined
await VideoViewsManager.Instance.processRemoteView({ video, viewerExpires })
await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires })
if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender

View File

@ -13,14 +13,15 @@ async function sendView (options: {
byActor: MActorLight
type: ViewType
video: MVideoImmutable
viewerIdentifier: string
transaction?: Transaction
}) {
const { byActor, type, video, transaction } = options
const { byActor, type, video, viewerIdentifier, transaction } = options
logger.info('Creating job to send %s of %s.', type, video.url)
const activityBuilder = (audience: ActivityAudience) => {
const url = getLocalVideoViewActivityPubUrl(byActor, video)
const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier)
return buildViewActivity({ url, byActor, video, audience, type })
}

View File

@ -56,8 +56,8 @@ function getLocalAbuseActivityPubUrl (abuse: MAbuseId) {
return WEBSERVER.URL + '/admin/abuses/' + abuse.id
}
function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId) {
return byActor.url + '/views/videos/' + video.id + '/' + new Date().toISOString()
function getLocalVideoViewActivityPubUrl (byActor: MActorUrl, video: MVideoId, viewerIdentifier: string) {
return byActor.url + '/views/videos/' + video.id + '/' + viewerIdentifier
}
function getLocalVideoViewerActivityPubUrl (stats: MLocalVideoViewer) {

View File

@ -145,18 +145,10 @@ class Redis {
return this.setValue(this.generateIPViewKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEW)
}
setIPVideoViewer (ip: string, videoUUID: string) {
return this.setValue(this.generateIPViewerKey(ip, videoUUID), '1', VIEW_LIFETIME.VIEWER_COUNTER)
}
async doesVideoIPViewExist (ip: string, videoUUID: string) {
return this.exists(this.generateIPViewKey(ip, videoUUID))
}
async doesVideoIPViewerExist (ip: string, videoUUID: string) {
return this.exists(this.generateIPViewerKey(ip, videoUUID))
}
/* ************ Tracker IP block ************ */
setTrackerBlockIP (ip: string) {
@ -361,10 +353,6 @@ class Redis {
return `views-${videoUUID}-${ip}`
}
private generateIPViewerKey (ip: string, videoUUID: string) {
return `viewer-${videoUUID}-${ip}`
}
private generateTrackerBlockIPKey (ip: string) {
return `tracker-block-ip-${ip}`
}

View File

@ -1,2 +1,3 @@
export * from './video-viewers'
export * from './video-viewer-counters'
export * from './video-viewer-stats'
export * from './video-views'

View File

@ -0,0 +1,176 @@
import { isTestInstance } from '@server/helpers/core-utils'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { VIEW_LIFETIME } from '@server/initializers/constants'
import { sendView } from '@server/lib/activitypub/send/send-view'
import { PeerTubeSocket } from '@server/lib/peertube-socket'
import { getServerActor } from '@server/models/application/application'
import { VideoModel } from '@server/models/video/video'
import { MVideo } from '@server/types/models'
import { buildUUID, sha256 } from '@shared/extra-utils'
const lTags = loggerTagsFactory('views')
type Viewer = {
expires: number
id: string
lastFederation?: number
}
export class VideoViewerCounters {
// expires is new Date().getTime()
private readonly viewersPerVideo = new Map<number, Viewer[]>()
private readonly idToViewer = new Map<string, Viewer>()
private readonly salt = buildUUID()
private processingViewerCounters = false
constructor () {
setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
}
// ---------------------------------------------------------------------------
async addLocalViewer (options: {
video: MVideo
ip: string
}) {
const { video, ip } = options
logger.debug('Adding local viewer to video viewers counter %s.', video.uuid, { ...lTags(video.uuid) })
const viewerId = this.generateViewerId(ip, video.uuid)
const viewer = this.idToViewer.get(viewerId)
if (viewer) {
viewer.expires = this.buildViewerExpireTime()
await this.federateViewerIfNeeded(video, viewer)
return false
}
const newViewer = await this.addViewerToVideo({ viewerId, video })
await this.federateViewerIfNeeded(video, newViewer)
return true
}
async addRemoteViewer (options: {
video: MVideo
viewerId: string
viewerExpires: Date
}) {
const { video, viewerExpires, viewerId } = options
logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
await this.addViewerToVideo({ video, viewerExpires, viewerId })
return true
}
// ---------------------------------------------------------------------------
getViewers (video: MVideo) {
const viewers = this.viewersPerVideo.get(video.id)
if (!viewers) return 0
return viewers.length
}
buildViewerExpireTime () {
return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER
}
// ---------------------------------------------------------------------------
private async addViewerToVideo (options: {
video: MVideo
viewerId: string
viewerExpires?: Date
}) {
const { video, viewerExpires, viewerId } = options
let watchers = this.viewersPerVideo.get(video.id)
if (!watchers) {
watchers = []
this.viewersPerVideo.set(video.id, watchers)
}
const expires = viewerExpires
? viewerExpires.getTime()
: this.buildViewerExpireTime()
const viewer = { id: viewerId, expires }
watchers.push(viewer)
this.idToViewer.set(viewerId, viewer)
await this.notifyClients(video.id, watchers.length)
return viewer
}
private async cleanViewerCounters () {
if (this.processingViewerCounters) return
this.processingViewerCounters = true
if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags())
try {
for (const videoId of this.viewersPerVideo.keys()) {
const notBefore = new Date().getTime()
const viewers = this.viewersPerVideo.get(videoId)
// Only keep not expired viewers
const newViewers: Viewer[] = []
// Filter new viewers
for (const viewer of viewers) {
if (viewer.expires > notBefore) {
newViewers.push(viewer)
} else {
this.idToViewer.delete(viewer.id)
}
}
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
else this.viewersPerVideo.set(videoId, newViewers)
await this.notifyClients(videoId, newViewers.length)
}
} catch (err) {
logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
}
this.processingViewerCounters = false
}
private async notifyClients (videoId: string | number, viewersLength: number) {
const video = await VideoModel.loadImmutableAttributes(videoId)
if (!video) return
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
}
private generateViewerId (ip: string, videoUUID: string) {
return sha256(this.salt + '-' + ip + '-' + videoUUID)
}
private async federateViewerIfNeeded (video: MVideo, viewer: Viewer) {
// Federate the viewer if it's been a "long" time we did not
const now = new Date().getTime()
const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER / 2)
if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
viewer.lastFederation = now
}
}

View File

@ -6,7 +6,6 @@ import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initiali
import { sequelizeTypescript } from '@server/initializers/database'
import { sendCreateWatchAction } from '@server/lib/activitypub/send'
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url'
import { PeerTubeSocket } from '@server/lib/peertube-socket'
import { Redis } from '@server/lib/redis'
import { VideoModel } from '@server/models/video/video'
import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
@ -32,39 +31,15 @@ type LocalViewerStats = {
videoId: number
}
export class VideoViewers {
// Values are Date().getTime()
private readonly viewersPerVideo = new Map<number, number[]>()
private processingViewerCounters = false
private processingViewerStats = false
export class VideoViewerStats {
private processingViewersStats = false
constructor () {
setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
}
// ---------------------------------------------------------------------------
getViewers (video: MVideo) {
const viewers = this.viewersPerVideo.get(video.id)
if (!viewers) return 0
return viewers.length
}
buildViewerExpireTime () {
return new Date().getTime() + VIEW_LIFETIME.VIEWER_COUNTER
}
async getWatchTime (videoId: number, ip: string) {
const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
return stats?.watchTime || 0
}
async addLocalViewer (options: {
video: MVideo
currentTime: number
@ -73,51 +48,20 @@ export class VideoViewers {
}) {
const { video, ip, viewEvent, currentTime } = options
logger.debug('Adding local viewer to video %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
logger.debug('Adding local viewer to video stats %s.', video.uuid, { currentTime, viewEvent, ...lTags(video.uuid) })
await this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
const viewExists = await Redis.Instance.doesVideoIPViewerExist(ip, video.uuid)
if (viewExists) return false
await Redis.Instance.setIPVideoViewer(ip, video.uuid)
return this.addViewerToVideo({ video })
return this.updateLocalViewerStats({ video, viewEvent, currentTime, ip })
}
async addRemoteViewer (options: {
video: MVideo
viewerExpires: Date
}) {
const { video, viewerExpires } = options
// ---------------------------------------------------------------------------
logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
async getWatchTime (videoId: number, ip: string) {
const stats: LocalViewerStats = await Redis.Instance.getLocalVideoViewer({ ip, videoId })
return this.addViewerToVideo({ video, viewerExpires })
return stats?.watchTime || 0
}
private async addViewerToVideo (options: {
video: MVideo
viewerExpires?: Date
}) {
const { video, viewerExpires } = options
let watchers = this.viewersPerVideo.get(video.id)
if (!watchers) {
watchers = []
this.viewersPerVideo.set(video.id, watchers)
}
const expiration = viewerExpires
? viewerExpires.getTime()
: this.buildViewerExpireTime()
watchers.push(expiration)
await this.notifyClients(video.id, watchers.length)
return true
}
// ---------------------------------------------------------------------------
private async updateLocalViewerStats (options: {
video: MVideo
@ -170,45 +114,9 @@ export class VideoViewers {
await Redis.Instance.setLocalVideoViewer(ip, video.id, stats)
}
private async cleanViewerCounters () {
if (this.processingViewerCounters) return
this.processingViewerCounters = true
if (!isTestInstance()) logger.info('Cleaning video viewers.', lTags())
try {
for (const videoId of this.viewersPerVideo.keys()) {
const notBefore = new Date().getTime()
const viewers = this.viewersPerVideo.get(videoId)
// Only keep not expired viewers
const newViewers = viewers.filter(w => w > notBefore)
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
else this.viewersPerVideo.set(videoId, newViewers)
await this.notifyClients(videoId, newViewers.length)
}
} catch (err) {
logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
}
this.processingViewerCounters = false
}
private async notifyClients (videoId: string | number, viewersLength: number) {
const video = await VideoModel.loadImmutableAttributes(videoId)
if (!video) return
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
}
async processViewerStats () {
if (this.processingViewerStats) return
this.processingViewerStats = true
if (this.processingViewersStats) return
this.processingViewersStats = true
if (!isTestInstance()) logger.info('Processing viewer statistics.', lTags())
@ -245,7 +153,7 @@ export class VideoViewers {
logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
}
this.processingViewerStats = false
this.processingViewersStats = false
}
private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {

View File

@ -1,5 +1,8 @@
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { sendView } from '@server/lib/activitypub/send/send-view'
import { getServerActor } from '@server/models/application/application'
import { MVideo } from '@server/types/models'
import { buildUUID } from '@shared/extra-utils'
import { Redis } from '../../redis'
const lTags = loggerTagsFactory('views')
@ -24,6 +27,8 @@ export class VideoViews {
await this.addView(video)
await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() })
return true
}
@ -39,6 +44,8 @@ export class VideoViews {
return true
}
// ---------------------------------------------------------------------------
private async addView (video: MVideo) {
const promises: Promise<any>[] = []

View File

@ -1,7 +1,7 @@
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { MVideo } from '@server/types/models'
import { VideoViewEvent } from '@shared/models'
import { VideoViewers, VideoViews } from './shared'
import { VideoViewerCounters, VideoViewerStats, VideoViews } from './shared'
/**
* If processing a local view:
@ -27,14 +27,16 @@ export class VideoViewsManager {
private static instance: VideoViewsManager
private videoViewers: VideoViewers
private videoViewerStats: VideoViewerStats
private videoViewerCounters: VideoViewerCounters
private videoViews: VideoViews
private constructor () {
}
init () {
this.videoViewers = new VideoViewers()
this.videoViewerStats = new VideoViewerStats()
this.videoViewerCounters = new VideoViewerCounters()
this.videoViews = new VideoViews()
}
@ -48,10 +50,12 @@ export class VideoViewsManager {
logger.debug('Processing local view for %s and ip %s.', video.url, ip, lTags())
const successViewer = await this.videoViewers.addLocalViewer({ video, ip, viewEvent, currentTime })
await this.videoViewerStats.addLocalViewer({ video, ip, viewEvent, currentTime })
const successViewer = await this.videoViewerCounters.addLocalViewer({ video, ip })
// Do it after added local viewer to fetch updated information
const watchTime = await this.videoViewers.getWatchTime(video.id, ip)
const watchTime = await this.videoViewerStats.getWatchTime(video.id, ip)
const successView = await this.videoViews.addLocalView({ video, watchTime, ip })
@ -60,26 +64,27 @@ export class VideoViewsManager {
async processRemoteView (options: {
video: MVideo
viewerId: string | null
viewerExpires?: Date
}) {
const { video, viewerExpires } = options
const { video, viewerId, viewerExpires } = options
logger.debug('Processing remote view for %s.', video.url, { viewerExpires, ...lTags() })
logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
if (viewerExpires) await this.videoViewers.addRemoteViewer({ video, viewerExpires })
if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires })
else await this.videoViews.addRemoteView({ video })
}
getViewers (video: MVideo) {
return this.videoViewers.getViewers(video)
return this.videoViewerCounters.getViewers(video)
}
buildViewerExpireTime () {
return this.videoViewers.buildViewerExpireTime()
return this.videoViewerCounters.buildViewerExpireTime()
}
processViewers () {
return this.videoViewers.processViewerStats()
processViewerStats () {
return this.videoViewerStats.processViewerStats()
}
static get Instance () {

View File

@ -19,7 +19,7 @@ describe('Test videos views', function () {
let userAccessToken: string
before(async function () {
this.timeout(30000)
this.timeout(120000)
servers = await createMultipleServers(2)
await setAccessTokensToServers(servers)

View File

@ -57,10 +57,11 @@ describe('Test video views/viewers counters', function () {
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
@ -68,7 +69,7 @@ describe('Test video views/viewers counters', function () {
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
await checkCounter('views', videoUUID, 3)
})
})
@ -78,7 +79,7 @@ describe('Test video views/viewers counters', function () {
let command: FfmpegCommand
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})

View File

@ -21,7 +21,7 @@ describe('Test views overall stats', function () {
let vodVideoId: string
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})
@ -74,7 +74,7 @@ describe('Test views overall stats', function () {
let command: FfmpegCommand
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
@ -189,7 +189,7 @@ describe('Test views overall stats', function () {
let videoUUID: string
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId: videoUUID } = await prepareViewsVideos({ servers, live: true, vod: true }))
})

View File

@ -20,7 +20,7 @@ describe('Test views retention stats', function () {
let vodVideoId: string
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})

View File

@ -24,7 +24,7 @@ describe('Test views timeserie stats', function () {
let vodVideoId: string
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId } = await prepareViewsVideos({ servers, live: false, vod: true }))
})
@ -63,7 +63,7 @@ describe('Test views timeserie stats', function () {
}
before(async function () {
this.timeout(60000);
this.timeout(120000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})