Fetch outbox to grab old activities tests
This commit is contained in:
11 changed files with 81 additions and 30 deletions
@ -2,7 +2,6 @@ import { createReadStream } from 'fs'
import { join } from 'path'
import { createInterface } from 'readline'
import * as winston from 'winston'
import { readFileBufferPromise } from '../server/helpers/core-utils'
import { CONFIG } from '../server/initializers/constants'
@ -16,7 +15,8 @@ const logger = new winston.Logger({
humanReadableUnhandledException: true,
json: false,
colorize: true,
prettyPrint: true
prettyPrint: true,
stderrLevels: []
exitOnError: true
@ -56,7 +56,7 @@ async function accountController (req: express.Request, res: express.Response, n
async function accountFollowersController (req: express.Request, res: express.Response, next: express.NextFunction) {
const account: AccountInstance = res.locals.account
const page = req.params.page || 1
const page = req.query.page || 1
const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE)
const result = await db.AccountFollow.listAcceptedFollowerUrlsForApi([ account.id ], start, count)
@ -68,7 +68,7 @@ async function accountFollowersController (req: express.Request, res: express.Re
async function accountFollowingController (req: express.Request, res: express.Response, next: express.NextFunction) {
const account: AccountInstance = res.locals.account
const page = req.params.page || 1
const page = req.query.page || 1
const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE)
const result = await db.AccountFollow.listAcceptedFollowingUrlsForApi([ account.id ], start, count)
@ -28,7 +28,7 @@ export {
async function outboxController (req: express.Request, res: express.Response, next: express.NextFunction) {
const account: AccountInstance = res.locals.account
const page = req.params.page || 1
const page = req.query.page || 1
const { start, count } = pageToStartAndCount(page, ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE)
const data = await db.Video.listAllAndSharedByAccountForOutbox(account.id, start, count)
@ -1,11 +1,15 @@
import * as express from 'express'
import { UserRight } from '../../../../shared/models/users/user-right.enum'
import { getFormattedObjects } from '../../../helpers'
import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { logger } from '../../../helpers/logger'
import { getServerAccount } from '../../../helpers/utils'
import { getAccountFromWebfinger } from '../../../helpers/webfinger'
import { SERVER_ACCOUNT_NAME } from '../../../initializers/constants'
import { database as db } from '../../../initializers/database'
import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account'
import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo'
import { sendFollow } from '../../../lib/index'
import { asyncMiddleware, paginationValidator, removeFollowingValidator, setFollowersSort, setPagination } from '../../../middlewares'
import { authenticate } from '../../../middlewares/oauth'
import { setBodyHostsPort } from '../../../middlewares/servers'
@ -13,13 +17,8 @@ import { setFollowingSort } from '../../../middlewares/sort'
import { ensureUserHasRight } from '../../../middlewares/user-right'
import { followValidator } from '../../../middlewares/validators/follows'
import { followersSortValidator, followingSortValidator } from '../../../middlewares/validators/sort'
import { AccountFollowInstance } from '../../../models/index'
import { sendFollow } from '../../../lib/index'
import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo'
import { AccountInstance } from '../../../models/account/account-interface'
import { retryTransactionWrapper } from '../../../helpers/database-utils'
import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account'
import { addFetchOutboxJob } from '../../../lib/activitypub/fetch'
import { AccountFollowInstance } from '../../../models/index'
const serverFollowsRouter = express.Router()
@ -137,8 +136,6 @@ async function follow (fromAccount: AccountInstance, targetAccount: AccountInsta
if (accountFollow.state === 'pending') {
await sendFollow(accountFollow, t)
await addFetchOutboxJob(targetAccount, t)
} catch (err) {
// Reset target account
@ -24,12 +24,15 @@ function activityPubContextify <T> (data: T) {
function activityPubCollectionPagination (url: string, page: number, result: ResultList<any>) {
function activityPubCollectionPagination (url: string, page: any, result: ResultList<any>) {
let next: string
let prev: string
// Assert page is a number
page = parseInt(page, 10)
// There are more results
if (result.total > ((page + 1) * ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE)) {
if (result.total > page * ACTIVITY_PUB.COLLECTION_ITEMS_PER_PAGE) {
next = url + '?page=' + (page + 1)
@ -53,6 +56,8 @@ function activityPubCollectionPagination (url: string, page: number, result: Res
totalItems: result.total,
first: orderedCollectionPagination
} else {
orderedCollectionPagination['totalItems'] = result.total
return orderedCollectionPagination
@ -328,6 +328,7 @@ if (isTestInstance() === true) {
// ---------------------------------------------------------------------------
@ -1,6 +1,7 @@
import { ActivityAccept } from '../../../../shared/models/activitypub/activity'
import { database as db } from '../../../initializers'
import { AccountInstance } from '../../../models/account/account-interface'
import { addFetchOutboxJob } from '../fetch'
async function processAcceptActivity (activity: ActivityAccept, inboxAccount?: AccountInstance) {
if (inboxAccount === undefined) throw new Error('Need to accept on explicit inbox.')
@ -24,4 +25,5 @@ async function processAccept (account: AccountInstance, targetAccount: AccountIn
follow.set('state', 'accepted')
await follow.save()
await addFetchOutboxJob(targetAccount, undefined)
@ -48,7 +48,7 @@ function addRemoteVideo (account: AccountInstance,
activity: ActivityAdd,
videoChannel: VideoChannelInstance,
videoToCreateData: VideoTorrentObject) {
logger.debug('Adding remote video %s.', videoToCreateData.url)
logger.debug('Adding remote video %s.', videoToCreateData.id)
return db.sequelize.transaction(async t => {
const sequelizeOptions = {
@ -25,7 +25,7 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
const activities = firstBody.first.orderedItems
logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri)
logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
await processActivities(activities)
@ -37,12 +37,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
options.uri = nextLink
const { body } = await doRequest(options)
nextLink = body.nextLink
nextLink = body.next
if (Array.isArray(body.orderedItems)) {
const activities = body.orderedItems
logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri)
logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
await processActivities(activities)
@ -46,6 +46,7 @@ import { TagInstance } from './tag-interface'
import { VideoFileInstance, VideoFileModel } from './video-file-interface'
import { VideoAttributes, VideoInstance, VideoMethods } from './video-interface'
import { sendDeleteVideo } from '../../lib/index'
import * as Bluebird from 'bluebird'
const Buffer = safeBuffer.Buffer
@ -786,14 +787,21 @@ list = function () {
listAllAndSharedByAccountForOutbox = function (accountId: number, start: number, count: number) {
const queryVideo = 'SELECT "Video"."id" FROM "Videos" AS "Video" ' +
'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' +
'WHERE "VideoChannel"."accountId" = ' + accountId
const queryVideoShare = 'SELECT "Video"."id" FROM "VideoShares" AS "VideoShare" ' +
'INNER JOIN "Videos" AS "Video" ON "Video"."id" = "VideoShare"."videoId" ' +
'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' +
'WHERE "VideoShare"."accountId" = ' + accountId
const rawQuery = `(${queryVideo}) UNION (${queryVideoShare}) LIMIT ${count} OFFSET ${start}`
function getRawQuery (select: string) {
const queryVideo = 'SELECT ' + select + ' FROM "Videos" AS "Video" ' +
'INNER JOIN "VideoChannels" AS "VideoChannel" ON "VideoChannel"."id" = "Video"."channelId" ' +
'WHERE "VideoChannel"."accountId" = ' + accountId
const queryVideoShare = 'SELECT ' + select + ' FROM "VideoShares" AS "VideoShare" ' +
'INNER JOIN "Videos" AS "Video" ON "Video"."id" = "VideoShare"."videoId" ' +
'WHERE "VideoShare"."accountId" = ' + accountId
let rawQuery = `(${queryVideo}) UNION (${queryVideoShare})`
return rawQuery
const rawQuery = getRawQuery('"Video"."id"')
const rawCountQuery = getRawQuery('COUNT("Video"."id") as "total"')
const query = {
distinct: true,
@ -825,10 +833,20 @@ listAllAndSharedByAccountForOutbox = function (accountId: number, start: number,
return Video.findAndCountAll(query).then(({ rows, count }) => {
return Bluebird.all([
Video['sequelize'].query(rawCountQuery, { type: Sequelize.QueryTypes.SELECT })
]).then(([ rows, totals ]) => {
// totals: totalVideos + totalVideoShares
let totalVideos = 0
let totalVideoShares = 0
if (totals[0]) totalVideos = parseInt(totals[0].total, 10)
if (totals[1]) totalVideoShares = parseInt(totals[1].total, 10)
const total = totalVideos + totalVideoShares
return {
data: rows,
total: count
total: total
@ -22,7 +22,7 @@ describe('Test follows', function () {
let server3Id: number
before(async function () {
servers = await flushAndRunMultipleServers(3)
@ -163,6 +163,34 @@ describe('Test follows', function () {
it('Should propagate previous uploaded videos on a new following', async function () {
await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-2' })
await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-3' })
await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-4' })
await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-5' })
await uploadVideo(servers[2].url, servers[2].accessToken, { name: 'server3-6' })
await wait(5000)
// Server 1 follows server 3
await follow(servers[0].url, [ servers[2].url ], servers[0].accessToken)
await wait(7000)
let res = await getVideosList(servers[0].url)
const video2 = res.body.data.find(v => v.name === 'server3-2')
const video4 = res.body.data.find(v => v.name === 'server3-4')
const video6 = res.body.data.find(v => v.name === 'server3-6')
after(async function () {
Add table
Reference in a new issue