diff --git a/packages/tests/src/api/activitypub/security.ts b/packages/tests/src/api/activitypub/security.ts index c26be99da..85f255039 100644 --- a/packages/tests/src/api/activitypub/security.ts +++ b/packages/tests/src/api/activitypub/security.ts @@ -9,13 +9,21 @@ import { buildGlobalHTTPHeaders, signAndContextify } from '@peertube/peertube-server/core/helpers/activity-pub-utils.js' -import { buildDigest } from '@peertube/peertube-server/core/helpers/peertube-crypto.js' +import { buildDigest, signJsonLDObject } from '@peertube/peertube-server/core/helpers/peertube-crypto.js' import { ACTIVITY_PUB, HTTP_SIGNATURE } from '@peertube/peertube-server/core/initializers/constants.js' import { makePOSTAPRequest } from '@tests/shared/requests.js' import { SQLCommand } from '@tests/shared/sql-command.js' import { expect } from 'chai' import { readJsonSync } from 'fs-extra/esm' +function signJsonLDObjectWithoutAssertion (options: Parameters[0]) { + return signJsonLDObject({ + ...options, + + disableWorkerThreadAssertion: true + }) +} + function fakeFilter () { return (data: any) => Promise.resolve(data) } @@ -132,7 +140,7 @@ describe('Test ActivityPub security', function () { it('Should fail with an invalid date', async function () { const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) headers['date'] = 'Wed, 21 Oct 2015 07:28:00 GMT' try { @@ -148,7 +156,7 @@ describe('Test ActivityPub security', function () { await setKeysOfServer(sqlCommands[1], servers[1].url, invalidKeys.publicKey, invalidKeys.privateKey) const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) try { await makePOSTAPRequest(url, body, baseHttpSignature(), headers) @@ -163,7 +171,7 @@ describe('Test ActivityPub security', function () { await setKeysOfServer(sqlCommands[1], servers[1].url, keys.publicKey, keys.privateKey) const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) const signatureOptions = baseHttpSignature() const badHeadersMatrix = [ @@ -186,7 +194,7 @@ describe('Test ActivityPub security', function () { it('Should succeed with a valid HTTP signature draft 11 (without date but with (created))', async function () { const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) const signatureOptions = baseHttpSignature() signatureOptions.headers = [ '(request-target)', '(created)', 'host', 'digest' ] @@ -197,7 +205,7 @@ describe('Test ActivityPub security', function () { it('Should succeed with a valid HTTP signature', async function () { const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) const { statusCode } = await makePOSTAPRequest(url, body, baseHttpSignature(), headers) expect(statusCode).to.equal(HttpStatusCode.NO_CONTENT_204) @@ -216,7 +224,7 @@ describe('Test ActivityPub security', function () { await servers[1].run() const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter()) - const headers = buildGlobalHTTPHeaders(body) + const headers = buildGlobalHTTPHeaders(body, buildDigest) try { await makePOSTAPRequest(url, body, baseHttpSignature(), headers) @@ -247,9 +255,15 @@ describe('Test ActivityPub security', function () { body.actor = servers[2].url + '/accounts/peertube' const signer: any = { privateKey: invalidKeys.privateKey, url: servers[2].url + '/accounts/peertube' } - const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter()) + const signedBody = await signAndContextify({ + byActor: signer, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) - const headers = buildGlobalHTTPHeaders(signedBody) + const headers = buildGlobalHTTPHeaders(signedBody, buildDigest) try { await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers) @@ -267,11 +281,17 @@ describe('Test ActivityPub security', function () { body.actor = servers[2].url + '/accounts/peertube' const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' } - const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter()) + const signedBody: any = await signAndContextify({ + byActor: signer, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) signedBody.actor = servers[2].url + '/account/peertube' - const headers = buildGlobalHTTPHeaders(signedBody) + const headers = buildGlobalHTTPHeaders(signedBody, buildDigest) try { await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers) @@ -286,9 +306,15 @@ describe('Test ActivityPub security', function () { body.actor = servers[2].url + '/accounts/peertube' const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' } - const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter()) + const signedBody = await signAndContextify({ + byActor: signer, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) - const headers = buildGlobalHTTPHeaders(signedBody) + const headers = buildGlobalHTTPHeaders(signedBody, buildDigest) const { statusCode } = await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers) expect(statusCode).to.equal(HttpStatusCode.NO_CONTENT_204) @@ -308,9 +334,15 @@ describe('Test ActivityPub security', function () { body.actor = servers[2].url + '/accounts/peertube' const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' } - const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter()) + const signedBody = await signAndContextify({ + byActor: signer, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) - const headers = buildGlobalHTTPHeaders(signedBody) + const headers = buildGlobalHTTPHeaders(signedBody, buildDigest) try { await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers) diff --git a/packages/tests/src/server-helpers/activitypub.ts b/packages/tests/src/server-helpers/activitypub.ts index a498412f3..1ee1d8196 100644 --- a/packages/tests/src/server-helpers/activitypub.ts +++ b/packages/tests/src/server-helpers/activitypub.ts @@ -5,13 +5,22 @@ import { signAndContextify } from '@peertube/peertube-server/core/helpers/activi import { isHTTPSignatureVerified, isJsonLDSignatureVerified, - parseHTTPSignature + parseHTTPSignature, + signJsonLDObject } from '@peertube/peertube-server/core/helpers/peertube-crypto.js' import { buildRequestStub } from '@tests/shared/tests.js' import { expect } from 'chai' import { readJsonSync } from 'fs-extra/esm' import cloneDeep from 'lodash-es/cloneDeep.js' +function signJsonLDObjectWithoutAssertion (options: Parameters[0]) { + return signJsonLDObject({ + ...options, + + disableWorkerThreadAssertion: true + }) +} + function fakeFilter () { return (data: any) => Promise.resolve(data) } @@ -55,7 +64,13 @@ describe('Test activity pub helpers', function () { const body = readJsonSync(buildAbsoluteFixturePath('./ap-json/peertube/announce-without-context.json')) const actorSignature = { url: 'http://localhost:9002/accounts/peertube', privateKey: keys.privateKey } - const signedBody = await signAndContextify(actorSignature as any, body, 'Announce', fakeFilter()) + const signedBody = await signAndContextify({ + byActor: actorSignature as any, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) const fromActor = { publicKey: keys.publicKey, url: 'http://localhost:9002/accounts/peertube' } const result = await isJsonLDSignatureVerified(fromActor as any, signedBody) @@ -68,7 +83,13 @@ describe('Test activity pub helpers', function () { const body = readJsonSync(buildAbsoluteFixturePath('./ap-json/peertube/announce-without-context.json')) const actorSignature = { url: 'http://localhost:9002/accounts/peertube', privateKey: keys.privateKey } - const signedBody = await signAndContextify(actorSignature as any, body, 'Announce', fakeFilter()) + const signedBody = await signAndContextify({ + byActor: actorSignature as any, + data: body, + contextType: 'Announce', + contextFilter: fakeFilter(), + signerFunction: signJsonLDObjectWithoutAssertion + }) const fromActor = { publicKey: keys.publicKey, url: 'http://localhost:9002/accounts/peertube' } const result = await isJsonLDSignatureVerified(fromActor as any, signedBody) diff --git a/server/core/helpers/activity-pub-utils.ts b/server/core/helpers/activity-pub-utils.ts index cda40fdaa..5bee8ca8e 100644 --- a/server/core/helpers/activity-pub-utils.ts +++ b/server/core/helpers/activity-pub-utils.ts @@ -2,11 +2,14 @@ import { ContextType } from '@peertube/peertube-models' import { ACTIVITY_PUB } from '@server/initializers/constants.js' import { buildDigest, signJsonLDObject } from './peertube-crypto.js' -type ContextFilter = (arg: T) => Promise +export type ContextFilter = (arg: T) => Promise -export function buildGlobalHTTPHeaders (body: any) { +export function buildGlobalHTTPHeaders ( + body: any, + digestBuilder: typeof buildDigest +) { return { - 'digest': buildDigest(body), + 'digest': digestBuilder(body), 'content-type': 'application/activity+json', 'accept': ACTIVITY_PUB.ACCEPT_HEADER } @@ -16,17 +19,20 @@ export async function activityPubContextify (data: T, type: ContextType, con return { ...await getContextData(type, contextFilter), ...data } } -export async function signAndContextify ( - byActor: { url: string, privateKey: string }, - data: T, - contextType: ContextType | null, +export async function signAndContextify (options: { + byActor: { url: string, privateKey: string } + data: T + contextType: ContextType | null contextFilter: ContextFilter -) { + signerFunction: typeof signJsonLDObject +}) { + const { byActor, data, contextType, contextFilter, signerFunction } = options + const activity = contextType ? await activityPubContextify(data, contextType, contextFilter) : data - return signJsonLDObject(byActor, activity) + return signerFunction({ byActor, data: activity }) } // --------------------------------------------------------------------------- diff --git a/server/core/helpers/peertube-crypto.ts b/server/core/helpers/peertube-crypto.ts index 45f2da6bf..fca635a63 100644 --- a/server/core/helpers/peertube-crypto.ts +++ b/server/core/helpers/peertube-crypto.ts @@ -7,6 +7,7 @@ import { BCRYPT_SALT_SIZE, ENCRYPTION, HTTP_SIGNATURE, PRIVATE_RSA_KEY_SIZE } fr import { MActor } from '../types/models/index.js' import { generateRSAKeyPairPromise, randomBytesPromise, scryptPromise } from './core-utils.js' import { logger } from './logger.js' +import { assertIsInWorkerThread } from './threads.js' function createPrivateAndPublicKeys () { logger.info('Generating a RSA key...') @@ -94,7 +95,15 @@ async function isJsonLDRSA2017Verified (fromActor: MActor, signedDocument: any) return verify.verify(fromActor.publicKey, signedDocument.signature.signatureValue, 'base64') } -async function signJsonLDObject (byActor: { url: string, privateKey: string }, data: T) { +async function signJsonLDObject (options: { + byActor: { url: string, privateKey: string } + data: T + disableWorkerThreadAssertion?: boolean +}) { + const { byActor, data, disableWorkerThreadAssertion = false } = options + + if (!disableWorkerThreadAssertion) assertIsInWorkerThread() + const signature = { type: 'RsaSignature2017', creator: byActor.url, diff --git a/server/core/helpers/threads.ts b/server/core/helpers/threads.ts new file mode 100644 index 000000000..15d974171 --- /dev/null +++ b/server/core/helpers/threads.ts @@ -0,0 +1,8 @@ +import { isMainThread } from 'node:worker_threads' +import { logger } from './logger.js' + +export function assertIsInWorkerThread () { + if (!isMainThread) return + + logger.error('Caller is not in worker thread', { stack: new Error().stack }) +} diff --git a/server/core/initializers/constants.ts b/server/core/initializers/constants.ts index ccb65692d..7e3a86401 100644 --- a/server/core/initializers/constants.ts +++ b/server/core/initializers/constants.ts @@ -976,6 +976,14 @@ const WORKER_THREADS = { GET_IMAGE_SIZE: { CONCURRENCY: 1, MAX_THREADS: 5 + }, + SIGN_JSON_LD_OBJECT: { + CONCURRENCY: 1, + MAX_THREADS: 2 + }, + BUILD_DIGEST: { + CONCURRENCY: 1, + MAX_THREADS: 2 } } diff --git a/server/core/lib/activitypub/send/http.ts b/server/core/lib/activitypub/send/http.ts index 8d15b970d..09e1c8eef 100644 --- a/server/core/lib/activitypub/send/http.ts +++ b/server/core/lib/activitypub/send/http.ts @@ -1,10 +1,11 @@ import { ContextType } from '@peertube/peertube-models' -import { signAndContextify } from '@server/helpers/activity-pub-utils.js' -import { HTTP_SIGNATURE } from '@server/initializers/constants.js' +import { ACTIVITY_PUB, HTTP_SIGNATURE } from '@server/initializers/constants.js' import { ActorModel } from '@server/models/actor/actor.js' import { getServerActor } from '@server/models/application/application.js' import { MActor } from '@server/types/models/index.js' import { getContextFilter } from '../context.js' +import { buildDigestFromWorker, signJsonLDObjectFromWorker } from '@server/lib/worker/parent-process.js' +import { signAndContextify } from '@server/helpers/activity-pub-utils.js' type Payload = { body: T, contextType: ContextType, signatureActorId?: number } @@ -17,12 +18,26 @@ export async function computeBody ( const actorSignature = await ActorModel.load(payload.signatureActorId) if (!actorSignature) throw new Error('Unknown signature actor id.') - body = await signAndContextify(actorSignature, payload.body, payload.contextType, getContextFilter()) + body = await signAndContextify({ + byActor: { url: actorSignature.url, privateKey: actorSignature.privateKey }, + data: payload.body, + contextType: payload.contextType, + contextFilter: getContextFilter(), + signerFunction: signJsonLDObjectFromWorker + }) } return body } +export async function buildGlobalHTTPHeaders (body: any) { + return { + 'digest': await buildDigestFromWorker(body), + 'content-type': 'application/activity+json', + 'accept': ACTIVITY_PUB.ACCEPT_HEADER + } +} + export async function buildSignedRequestOptions (options: { signatureActorId?: number hasPayload: boolean diff --git a/server/core/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/core/lib/job-queue/handlers/activitypub-http-broadcast.ts index f7307c97d..06f62722d 100644 --- a/server/core/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/core/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,7 +1,6 @@ import { Job } from 'bullmq' import { ActivitypubHttpBroadcastPayload } from '@peertube/peertube-models' -import { buildGlobalHTTPHeaders } from '@server/helpers/activity-pub-utils.js' -import { buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/index.js' +import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js' import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache.js' import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process.js' import { logger } from '../../../helpers/logger.js' @@ -45,6 +44,6 @@ async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) { method: 'POST' as 'POST', json: body, httpSignature: httpSignatureOptions, - headers: buildGlobalHTTPHeaders(body) + headers: await buildGlobalHTTPHeaders(body) } } diff --git a/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts index 9b2f542f0..f804aa06a 100644 --- a/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/core/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,7 +1,6 @@ import { Job } from 'bullmq' import { ActivitypubHttpUnicastPayload } from '@peertube/peertube-models' -import { buildGlobalHTTPHeaders } from '@server/helpers/activity-pub-utils.js' -import { buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/index.js' +import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js' import { logger } from '../../../helpers/logger.js' import { doRequest } from '../../../helpers/requests.js' import { ActorFollowHealthCache } from '../../actor-follow-health-cache.js' @@ -19,7 +18,7 @@ async function processActivityPubHttpUnicast (job: Job) { method: 'POST' as 'POST', json: body, httpSignature: httpSignatureOptions, - headers: buildGlobalHTTPHeaders(body) + headers: await buildGlobalHTTPHeaders(body) } try { diff --git a/server/core/lib/worker/parent-process.ts b/server/core/lib/worker/parent-process.ts index d59726905..3e97fb7e9 100644 --- a/server/core/lib/worker/parent-process.ts +++ b/server/core/lib/worker/parent-process.ts @@ -5,10 +5,12 @@ import type httpBroadcast from './workers/http-broadcast.js' import type downloadImage from './workers/image-downloader.js' import type processImage from './workers/image-processor.js' import type getImageSize from './workers/get-image-size.js' +import type signJsonLDObject from './workers/sign-json-ld-object.js' +import type buildDigest from './workers/build-digest.js' let downloadImageWorker: Piscina -function downloadImageFromWorker (options: Parameters[0]): Promise> { +export function downloadImageFromWorker (options: Parameters[0]): Promise> { if (!downloadImageWorker) { downloadImageWorker = new Piscina({ filename: new URL(join('workers', 'image-downloader.js'), import.meta.url).href, @@ -24,7 +26,7 @@ function downloadImageFromWorker (options: Parameters[0]): let processImageWorker: Piscina -function processImageFromWorker (options: Parameters[0]): Promise> { +export function processImageFromWorker (options: Parameters[0]): Promise> { if (!processImageWorker) { processImageWorker = new Piscina({ filename: new URL(join('workers', 'image-processor.js'), import.meta.url).href, @@ -40,7 +42,7 @@ function processImageFromWorker (options: Parameters[0]): P let getImageSizeWorker: Piscina -function getImageSizeFromWorker (options: Parameters[0]): Promise> { +export function getImageSizeFromWorker (options: Parameters[0]): Promise> { if (!getImageSizeWorker) { getImageSizeWorker = new Piscina({ filename: new URL(join('workers', 'get-image-size.js'), import.meta.url).href, @@ -56,7 +58,7 @@ function getImageSizeFromWorker (options: Parameters[0]): P let parallelHTTPBroadcastWorker: Piscina -function parallelHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { +export function parallelHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { if (!parallelHTTPBroadcastWorker) { parallelHTTPBroadcastWorker = new Piscina({ filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href, @@ -73,7 +75,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters[0]): Promise> { +export function sequentialHTTPBroadcastFromWorker ( + options: Parameters[0] +): Promise> { if (!sequentialHTTPBroadcastWorker) { sequentialHTTPBroadcastWorker = new Piscina({ filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href, @@ -86,10 +90,40 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters ( + options: Parameters>[0] +): ReturnType> { + if (!signJsonLDObjectWorker) { + signJsonLDObjectWorker = new Piscina({ + filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href, + // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs + concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY, + maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS + }) + } + + return signJsonLDObjectWorker.run(options) +} + +// --------------------------------------------------------------------------- + +let buildDigestWorker: Piscina + +export function buildDigestFromWorker ( + options: Parameters[0] +): Promise> { + if (!buildDigestWorker) { + buildDigestWorker = new Piscina({ + filename: new URL(join('workers', 'build-digest.js'), import.meta.url).href, + // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs + concurrentTasksPerWorker: WORKER_THREADS.BUILD_DIGEST.CONCURRENCY, + maxThreads: WORKER_THREADS.BUILD_DIGEST.MAX_THREADS + }) + } + + return buildDigestWorker.run(options) } diff --git a/server/core/lib/worker/workers/build-digest.ts b/server/core/lib/worker/workers/build-digest.ts new file mode 100644 index 000000000..7a7c812d6 --- /dev/null +++ b/server/core/lib/worker/workers/build-digest.ts @@ -0,0 +1,3 @@ +import { buildDigest } from '@server/helpers/peertube-crypto.js' + +export default buildDigest diff --git a/server/core/lib/worker/workers/sign-json-ld-object.ts b/server/core/lib/worker/workers/sign-json-ld-object.ts new file mode 100644 index 000000000..9baf7c7b0 --- /dev/null +++ b/server/core/lib/worker/workers/sign-json-ld-object.ts @@ -0,0 +1,3 @@ +import { signJsonLDObject } from '@server/helpers/peertube-crypto.js' + +export default signJsonLDObject