Optimize broadcast job creation
This commit is contained in:
		
							parent
							
								
									b9e49a45f5
								
							
						
					
					
						commit
						3396e65345
					
				
					 6 changed files with 49 additions and 62 deletions
				
			
		| 
						 | 
				
			
			@ -1,6 +1,13 @@
 | 
			
		|||
import Bluebird from 'bluebird'
 | 
			
		||||
import { wait } from '@shared/core-utils'
 | 
			
		||||
import { createSingleServer, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/server-commands'
 | 
			
		||||
import {
 | 
			
		||||
  createSingleServer,
 | 
			
		||||
  doubleFollow,
 | 
			
		||||
  killallServers,
 | 
			
		||||
  PeerTubeServer,
 | 
			
		||||
  setAccessTokensToServers,
 | 
			
		||||
  waitJobs
 | 
			
		||||
} from '@shared/server-commands'
 | 
			
		||||
 | 
			
		||||
let servers: PeerTubeServer[]
 | 
			
		||||
const viewers: { xForwardedFor: string }[] = []
 | 
			
		||||
| 
						 | 
				
			
			@ -9,6 +16,7 @@ let videoId: string
 | 
			
		|||
run()
 | 
			
		||||
  .then(() => process.exit(0))
 | 
			
		||||
  .catch(err => console.error(err))
 | 
			
		||||
  .finally(() => killallServers(servers))
 | 
			
		||||
 | 
			
		||||
async function run () {
 | 
			
		||||
  await prepare()
 | 
			
		||||
| 
						 | 
				
			
			@ -69,9 +77,13 @@ async function prepare () {
 | 
			
		|||
async function runViewers () {
 | 
			
		||||
  console.log('Will run views of %d viewers.', viewers.length)
 | 
			
		||||
 | 
			
		||||
  const before = new Date().getTime()
 | 
			
		||||
 | 
			
		||||
  await Bluebird.map(viewers, viewer => {
 | 
			
		||||
    return servers[0].views.simulateView({ id: videoId, xForwardedFor: viewer.xForwardedFor })
 | 
			
		||||
  }, { concurrency: 100 })
 | 
			
		||||
 | 
			
		||||
  console.log('Finished to run views in %d seconds.', (new Date().getTime() - before) / 1000)
 | 
			
		||||
 | 
			
		||||
  await wait(5000)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@ import { ACTIVITY_PUB } from '@server/initializers/constants'
 | 
			
		|||
import { ActorModel } from '@server/models/actor/actor'
 | 
			
		||||
import { VideoModel } from '@server/models/video/video'
 | 
			
		||||
import { VideoShareModel } from '@server/models/video/video-share'
 | 
			
		||||
import { MActorFollowersUrl, MActorLight, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models'
 | 
			
		||||
import { MActorFollowersUrl, MActorUrl, MCommentOwner, MCommentOwnerVideo, MVideoId } from '@server/types/models'
 | 
			
		||||
import { ActivityAudience } from '@shared/models'
 | 
			
		||||
 | 
			
		||||
function getOriginVideoAudience (accountActor: MActorUrl, actorsInvolvedInVideo: MActorFollowersUrl[] = []): ActivityAudience {
 | 
			
		||||
| 
						 | 
				
			
			@ -51,13 +51,13 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: MActorFollowersUrl[
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
async function getActorsInvolvedInVideo (video: MVideoId, t: Transaction) {
 | 
			
		||||
  const actors: MActorLight[] = await VideoShareModel.loadActorsByShare(video.id, t)
 | 
			
		||||
  const actors = await VideoShareModel.listActorIdsAndFollowerUrlsByShare(video.id, t)
 | 
			
		||||
 | 
			
		||||
  const videoAll = video as VideoModel
 | 
			
		||||
 | 
			
		||||
  const videoActor = videoAll.VideoChannel?.Account
 | 
			
		||||
    ? videoAll.VideoChannel.Account.Actor
 | 
			
		||||
    : await ActorModel.loadFromAccountByVideoId(video.id, t)
 | 
			
		||||
    : await ActorModel.loadAccountActorFollowerUrlByVideoId(video.id, t)
 | 
			
		||||
 | 
			
		||||
  actors.push(videoActor)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,13 +19,13 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
 | 
			
		|||
}) {
 | 
			
		||||
  const { byActor, video, transaction, contextType } = options
 | 
			
		||||
 | 
			
		||||
  const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
 | 
			
		||||
 | 
			
		||||
  // Send to origin
 | 
			
		||||
  if (video.isOwned() === false) {
 | 
			
		||||
    return sendVideoActivityToOrigin(activityBuilder, options)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
 | 
			
		||||
 | 
			
		||||
  // Send to followers
 | 
			
		||||
  const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
 | 
			
		||||
  const activity = activityBuilder(audience)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,3 @@
 | 
			
		|||
 | 
			
		||||
import { isTestInstance } from '@server/helpers/core-utils'
 | 
			
		||||
import { logger, loggerTagsFactory } from '@server/helpers/logger'
 | 
			
		||||
import { VIEW_LIFETIME } from '@server/initializers/constants'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,5 @@
 | 
			
		|||
import { values } from 'lodash'
 | 
			
		||||
import { literal, Op, Transaction } from 'sequelize'
 | 
			
		||||
import { literal, Op, QueryTypes, Transaction } from 'sequelize'
 | 
			
		||||
import {
 | 
			
		||||
  AllowNull,
 | 
			
		||||
  BelongsTo,
 | 
			
		||||
| 
						 | 
				
			
			@ -43,15 +43,18 @@ import {
 | 
			
		|||
  MActorAccountChannelId,
 | 
			
		||||
  MActorAPAccount,
 | 
			
		||||
  MActorAPChannel,
 | 
			
		||||
  MActorFollowersUrl,
 | 
			
		||||
  MActorFormattable,
 | 
			
		||||
  MActorFull,
 | 
			
		||||
  MActorHost,
 | 
			
		||||
  MActorId,
 | 
			
		||||
  MActorServer,
 | 
			
		||||
  MActorSummaryFormattable,
 | 
			
		||||
  MActorUrl,
 | 
			
		||||
  MActorWithInboxes
 | 
			
		||||
} from '../../types/models'
 | 
			
		||||
import { AccountModel } from '../account/account'
 | 
			
		||||
import { getServerActor } from '../application/application'
 | 
			
		||||
import { ServerModel } from '../server/server'
 | 
			
		||||
import { isOutdated, throwIfNotValid } from '../utils'
 | 
			
		||||
import { VideoModel } from '../video/video'
 | 
			
		||||
| 
						 | 
				
			
			@ -304,7 +307,10 @@ export class ActorModel extends Model<Partial<AttributesOnly<ActorModel>>> {
 | 
			
		|||
  })
 | 
			
		||||
  VideoChannel: VideoChannelModel
 | 
			
		||||
 | 
			
		||||
  static load (id: number): Promise<MActor> {
 | 
			
		||||
  static async load (id: number): Promise<MActor> {
 | 
			
		||||
    const actorServer = await getServerActor()
 | 
			
		||||
    if (id === actorServer.id) return actorServer
 | 
			
		||||
 | 
			
		||||
    return ActorModel.unscoped().findByPk(id)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -312,48 +318,21 @@ export class ActorModel extends Model<Partial<AttributesOnly<ActorModel>>> {
 | 
			
		|||
    return ActorModel.scope(ScopeNames.FULL).findByPk(id)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static loadFromAccountByVideoId (videoId: number, transaction: Transaction): Promise<MActor> {
 | 
			
		||||
    const query = {
 | 
			
		||||
      include: [
 | 
			
		||||
        {
 | 
			
		||||
          attributes: [ 'id' ],
 | 
			
		||||
          model: AccountModel.unscoped(),
 | 
			
		||||
          required: true,
 | 
			
		||||
          include: [
 | 
			
		||||
            {
 | 
			
		||||
              attributes: [ 'id' ],
 | 
			
		||||
              model: VideoChannelModel.unscoped(),
 | 
			
		||||
              required: true,
 | 
			
		||||
              include: [
 | 
			
		||||
                {
 | 
			
		||||
                  attributes: [ 'id' ],
 | 
			
		||||
                  model: VideoModel.unscoped(),
 | 
			
		||||
                  required: true,
 | 
			
		||||
                  where: {
 | 
			
		||||
                    id: videoId
 | 
			
		||||
                  }
 | 
			
		||||
                }
 | 
			
		||||
              ]
 | 
			
		||||
            }
 | 
			
		||||
          ]
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
  static loadAccountActorFollowerUrlByVideoId (videoId: number, transaction: Transaction) {
 | 
			
		||||
    const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` +
 | 
			
		||||
                  `FROM "actor" ` +
 | 
			
		||||
                  `INNER JOIN "account" ON "actor"."id" = "account"."actorId" ` +
 | 
			
		||||
                  `INNER JOIN "videoChannel" ON "videoChannel"."accountId" = "account"."id" ` +
 | 
			
		||||
                  `INNER JOIN "video" ON "video"."channelId" = "videoChannel"."id" AND "video"."id" = :videoId`
 | 
			
		||||
 | 
			
		||||
    const options = {
 | 
			
		||||
      type: QueryTypes.SELECT as QueryTypes.SELECT,
 | 
			
		||||
      replacements: { videoId },
 | 
			
		||||
      plain: true as true,
 | 
			
		||||
      transaction
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return ActorModel.unscoped().findOne(query)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static isActorUrlExist (url: string) {
 | 
			
		||||
    const query = {
 | 
			
		||||
      raw: true,
 | 
			
		||||
      where: {
 | 
			
		||||
        url
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return ActorModel.unscoped().findOne(query)
 | 
			
		||||
      .then(a => !!a)
 | 
			
		||||
    return ActorModel.sequelize.query<MActorId & MActorFollowersUrl>(query, options)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static listByFollowersUrls (followersUrls: string[], transaction?: Transaction): Promise<MActorFull[]> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Mode
 | 
			
		|||
import { AttributesOnly } from '@shared/typescript-utils'
 | 
			
		||||
import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
 | 
			
		||||
import { CONSTRAINTS_FIELDS } from '../../initializers/constants'
 | 
			
		||||
import { MActorDefault } from '../../types/models'
 | 
			
		||||
import { MActorDefault, MActorFollowersUrl, MActorId } from '../../types/models'
 | 
			
		||||
import { MVideoShareActor, MVideoShareFull } from '../../types/models/video'
 | 
			
		||||
import { ActorModel } from '../actor/actor'
 | 
			
		||||
import { buildLocalActorIdsIn, throwIfNotValid } from '../utils'
 | 
			
		||||
| 
						 | 
				
			
			@ -107,22 +107,19 @@ export class VideoShareModel extends Model<Partial<AttributesOnly<VideoShareMode
 | 
			
		|||
    })
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static loadActorsByShare (videoId: number, t: Transaction): Promise<MActorDefault[]> {
 | 
			
		||||
    const query = {
 | 
			
		||||
      where: {
 | 
			
		||||
        videoId
 | 
			
		||||
      },
 | 
			
		||||
      include: [
 | 
			
		||||
        {
 | 
			
		||||
          model: ActorModel,
 | 
			
		||||
          required: true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
  static listActorIdsAndFollowerUrlsByShare (videoId: number, t: Transaction) {
 | 
			
		||||
    const query = `SELECT "actor"."id" AS "id", "actor"."followersUrl" AS "followersUrl" ` +
 | 
			
		||||
                  `FROM "videoShare" ` +
 | 
			
		||||
                  `INNER JOIN "actor" ON "actor"."id" = "videoShare"."actorId" ` +
 | 
			
		||||
                  `WHERE "videoShare"."videoId" = :videoId`
 | 
			
		||||
 | 
			
		||||
    const options = {
 | 
			
		||||
      type: QueryTypes.SELECT as QueryTypes.SELECT,
 | 
			
		||||
      replacements: { videoId },
 | 
			
		||||
      transaction: t
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return VideoShareModel.scope(ScopeNames.FULL).findAll(query)
 | 
			
		||||
                          .then((res: MVideoShareFull[]) => res.map(r => r.Actor))
 | 
			
		||||
    return VideoShareModel.sequelize.query<MActorId & MActorFollowersUrl>(query, options)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static loadActorsWhoSharedVideosOf (actorOwnerId: number, t: Transaction): Promise<MActorDefault[]> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue