Introduce worker threads to process remote images
This commit is contained in:
parent
2873f00bd8
commit
c53853ca1b
9 changed files with 123 additions and 49 deletions
|
@ -137,6 +137,7 @@
|
|||
"password-generator": "^2.0.2",
|
||||
"pem": "^1.12.3",
|
||||
"pg": "^8.2.1",
|
||||
"piscina": "^3.2.0",
|
||||
"prompt": "^1.0.0",
|
||||
"proxy-addr": "^2.0.7",
|
||||
"pug": "^3.0.0",
|
||||
|
|
|
@ -6,7 +6,7 @@ import { HttpStatusCode } from '../../shared/models/http/http-error-codes'
|
|||
import { logger } from '../helpers/logger'
|
||||
import { ACTOR_IMAGES_SIZE, LAZY_STATIC_PATHS, STATIC_MAX_AGE } from '../initializers/constants'
|
||||
import { VideosCaptionCache, VideosPreviewCache } from '../lib/files-cache'
|
||||
import { actorImagePathUnsafeCache, pushActorImageProcessInQueue } from '../lib/local-actor'
|
||||
import { actorImagePathUnsafeCache, downloadActorImageFromWorker } from '../lib/local-actor'
|
||||
import { asyncMiddleware } from '../middlewares'
|
||||
import { ActorImageModel } from '../models/actor/actor-image'
|
||||
|
||||
|
@ -65,7 +65,7 @@ async function getActorImage (req: express.Request, res: express.Response, next:
|
|||
logger.info('Lazy serve remote actor image %s.', image.fileUrl)
|
||||
|
||||
try {
|
||||
await pushActorImageProcessInQueue({
|
||||
await downloadActorImageFromWorker({
|
||||
filename: image.filename,
|
||||
fileUrl: image.fileUrl,
|
||||
size: getActorImageSize(image),
|
||||
|
|
|
@ -1,11 +1,8 @@
|
|||
import { createWriteStream, remove } from 'fs-extra'
|
||||
import got, { CancelableRequest, NormalizedOptions, Options as GotOptions, RequestError, Response } from 'got'
|
||||
import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent'
|
||||
import { join } from 'path'
|
||||
import { CONFIG } from '../initializers/config'
|
||||
import { ACTIVITY_PUB, BINARY_CONTENT_TYPES, PEERTUBE_VERSION, REQUEST_TIMEOUTS, WEBSERVER } from '../initializers/constants'
|
||||
import { pipelinePromise } from './core-utils'
|
||||
import { processImage } from './image-utils'
|
||||
import { logger, loggerTagsFactory } from './logger'
|
||||
import { getProxy, isProxyEnabled } from './proxy'
|
||||
|
||||
|
@ -147,21 +144,6 @@ async function doRequestAndSaveToFile (
|
|||
}
|
||||
}
|
||||
|
||||
async function downloadImage (url: string, destDir: string, destName: string, size: { width: number, height: number }) {
|
||||
const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName)
|
||||
await doRequestAndSaveToFile(url, tmpPath)
|
||||
|
||||
const destPath = join(destDir, destName)
|
||||
|
||||
try {
|
||||
await processImage(tmpPath, destPath, size)
|
||||
} catch (err) {
|
||||
await remove(tmpPath)
|
||||
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
function getAgent () {
|
||||
if (!isProxyEnabled()) return {}
|
||||
|
||||
|
@ -211,7 +193,6 @@ export {
|
|||
doJSONRequest,
|
||||
doRequestAndSaveToFile,
|
||||
isBinaryResponse,
|
||||
downloadImage,
|
||||
getAgent,
|
||||
findLatestRedirection,
|
||||
peertubeGot
|
||||
|
|
|
@ -744,8 +744,11 @@ const MEMOIZE_LENGTH = {
|
|||
VIDEO_DURATION: 200
|
||||
}
|
||||
|
||||
const QUEUE_CONCURRENCY = {
|
||||
ACTOR_PROCESS_IMAGE: 3
|
||||
const WORKER_THREADS = {
|
||||
DOWNLOAD_IMAGE: {
|
||||
CONCURRENCY: 3,
|
||||
MAX_THREADS: 1
|
||||
}
|
||||
}
|
||||
|
||||
const REDUNDANCY = {
|
||||
|
@ -955,7 +958,7 @@ export {
|
|||
VIDEO_PRIVACIES,
|
||||
VIDEO_LICENCES,
|
||||
VIDEO_STATES,
|
||||
QUEUE_CONCURRENCY,
|
||||
WORKER_THREADS,
|
||||
VIDEO_RATE_TYPES,
|
||||
JOB_PRIORITY,
|
||||
VIDEO_TRANSCODING_FPS,
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import { queue } from 'async'
|
||||
import { remove } from 'fs-extra'
|
||||
import LRUCache from 'lru-cache'
|
||||
import { join } from 'path'
|
||||
|
@ -8,13 +7,13 @@ import { buildUUID } from '@shared/extra-utils'
|
|||
import { ActivityPubActorType, ActorImageType } from '@shared/models'
|
||||
import { retryTransactionWrapper } from '../helpers/database-utils'
|
||||
import { processImage } from '../helpers/image-utils'
|
||||
import { downloadImage } from '../helpers/requests'
|
||||
import { CONFIG } from '../initializers/config'
|
||||
import { ACTOR_IMAGES_SIZE, LRU_CACHE, QUEUE_CONCURRENCY, WEBSERVER } from '../initializers/constants'
|
||||
import { ACTOR_IMAGES_SIZE, LRU_CACHE, WEBSERVER } from '../initializers/constants'
|
||||
import { sequelizeTypescript } from '../initializers/database'
|
||||
import { MAccountDefault, MActor, MChannelDefault } from '../types/models'
|
||||
import { deleteActorImages, updateActorImages } from './activitypub/actors'
|
||||
import { sendUpdateActor } from './activitypub/send'
|
||||
import { downloadImageFromWorker } from './worker/parent-process'
|
||||
|
||||
function buildActorInstance (type: ActivityPubActorType, url: string, preferredUsername: string) {
|
||||
return new ActorModel({
|
||||
|
@ -87,27 +86,22 @@ async function deleteLocalActorImageFile (accountOrChannel: MAccountDefault | MC
|
|||
})
|
||||
}
|
||||
|
||||
type DownloadImageQueueTask = {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function downloadActorImageFromWorker (options: {
|
||||
fileUrl: string
|
||||
filename: string
|
||||
type: ActorImageType
|
||||
size: typeof ACTOR_IMAGES_SIZE[ActorImageType][0]
|
||||
}
|
||||
}) {
|
||||
const downloaderOptions = {
|
||||
url: options.fileUrl,
|
||||
destDir: CONFIG.STORAGE.ACTOR_IMAGES,
|
||||
destName: options.filename,
|
||||
size: options.size
|
||||
}
|
||||
|
||||
const downloadImageQueue = queue<DownloadImageQueueTask, Error>((task, cb) => {
|
||||
downloadImage(task.fileUrl, CONFIG.STORAGE.ACTOR_IMAGES, task.filename, task.size)
|
||||
.then(() => cb())
|
||||
.catch(err => cb(err))
|
||||
}, QUEUE_CONCURRENCY.ACTOR_PROCESS_IMAGE)
|
||||
|
||||
function pushActorImageProcessInQueue (task: DownloadImageQueueTask) {
|
||||
return new Promise<void>((res, rej) => {
|
||||
downloadImageQueue.push(task, err => {
|
||||
if (err) return rej(err)
|
||||
|
||||
return res()
|
||||
})
|
||||
})
|
||||
return downloadImageFromWorker(downloaderOptions)
|
||||
}
|
||||
|
||||
// Unsafe so could returns paths that does not exist anymore
|
||||
|
@ -116,7 +110,8 @@ const actorImagePathUnsafeCache = new LRUCache<string, string>({ max: LRU_CACHE.
|
|||
export {
|
||||
actorImagePathUnsafeCache,
|
||||
updateLocalActorImageFiles,
|
||||
downloadActorImageFromWorker,
|
||||
deleteLocalActorImageFile,
|
||||
pushActorImageProcessInQueue,
|
||||
downloadImageFromWorker,
|
||||
buildActorInstance
|
||||
}
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
import { join } from 'path'
|
||||
import { ThumbnailType } from '@shared/models'
|
||||
import { generateImageFilename, generateImageFromVideoFile, processImage } from '../helpers/image-utils'
|
||||
import { downloadImage } from '../helpers/requests'
|
||||
import { CONFIG } from '../initializers/config'
|
||||
import { ASSETS_PATH, PREVIEWS_SIZE, THUMBNAILS_SIZE } from '../initializers/constants'
|
||||
import { ThumbnailModel } from '../models/video/thumbnail'
|
||||
import { MVideoFile, MVideoThumbnail, MVideoUUID } from '../types/models'
|
||||
import { MThumbnail } from '../types/models/video/thumbnail'
|
||||
import { MVideoPlaylistThumbnail } from '../types/models/video/video-playlist'
|
||||
import { downloadImageFromWorker } from './local-actor'
|
||||
import { VideoPathManager } from './video-path-manager'
|
||||
|
||||
type ImageSize = { height?: number, width?: number }
|
||||
|
@ -49,7 +49,10 @@ function updatePlaylistMiniatureFromUrl (options: {
|
|||
? null
|
||||
: downloadUrl
|
||||
|
||||
const thumbnailCreator = () => downloadImage(downloadUrl, basePath, filename, { width, height })
|
||||
const thumbnailCreator = () => {
|
||||
return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
|
||||
}
|
||||
|
||||
return updateThumbnailFromFunction({ thumbnailCreator, filename, height, width, type, existingThumbnail, fileUrl })
|
||||
}
|
||||
|
||||
|
@ -75,7 +78,9 @@ function updateVideoMiniatureFromUrl (options: {
|
|||
: existingThumbnail.filename
|
||||
|
||||
const thumbnailCreator = () => {
|
||||
if (thumbnailUrlChanged) return downloadImage(downloadUrl, basePath, filename, { width, height })
|
||||
if (thumbnailUrlChanged) {
|
||||
return downloadImageFromWorker({ url: downloadUrl, destDir: basePath, destName: filename, size: { width, height } })
|
||||
}
|
||||
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
|
18
server/lib/worker/parent-process.ts
Normal file
18
server/lib/worker/parent-process.ts
Normal file
|
@ -0,0 +1,18 @@
|
|||
import { join } from 'path'
|
||||
import Piscina from 'piscina'
|
||||
import { WORKER_THREADS } from '@server/initializers/constants'
|
||||
import { downloadImage } from './workers/image-downloader'
|
||||
|
||||
const downloadImagerWorker = new Piscina({
|
||||
filename: join(__dirname, 'workers', 'image-downloader.js'),
|
||||
concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY,
|
||||
maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS
|
||||
})
|
||||
|
||||
function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
|
||||
return downloadImagerWorker.run(options)
|
||||
}
|
||||
|
||||
export {
|
||||
downloadImageFromWorker
|
||||
}
|
33
server/lib/worker/workers/image-downloader.ts
Normal file
33
server/lib/worker/workers/image-downloader.ts
Normal file
|
@ -0,0 +1,33 @@
|
|||
import { remove } from 'fs-extra'
|
||||
import { join } from 'path'
|
||||
import { processImage } from '@server/helpers/image-utils'
|
||||
import { doRequestAndSaveToFile } from '@server/helpers/requests'
|
||||
import { CONFIG } from '@server/initializers/config'
|
||||
|
||||
async function downloadImage (options: {
|
||||
url: string
|
||||
destDir: string
|
||||
destName: string
|
||||
size: { width: number, height: number }
|
||||
}) {
|
||||
const { url, destDir, destName, size } = options
|
||||
|
||||
const tmpPath = join(CONFIG.STORAGE.TMP_DIR, 'pending-' + destName)
|
||||
await doRequestAndSaveToFile(url, tmpPath)
|
||||
|
||||
const destPath = join(destDir, destName)
|
||||
|
||||
try {
|
||||
await processImage(tmpPath, destPath, size)
|
||||
} catch (err) {
|
||||
await remove(tmpPath)
|
||||
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = downloadImage
|
||||
|
||||
export {
|
||||
downloadImage
|
||||
}
|
42
yarn.lock
42
yarn.lock
|
@ -44,6 +44,11 @@
|
|||
ajv-draft-04 "^1.0.0"
|
||||
call-me-maybe "^1.0.1"
|
||||
|
||||
"@assemblyscript/loader@^0.10.1":
|
||||
version "0.10.1"
|
||||
resolved "https://registry.yarnpkg.com/@assemblyscript/loader/-/loader-0.10.1.tgz#70e45678f06c72fa2e350e8553ec4a4d72b92e06"
|
||||
integrity sha512-H71nDOOL8Y7kWRLqf6Sums+01Q5msqBW2KhDUTemh1tvY04eSkSXrK0uj/4mmY0Xr16/3zyZmsrxN7CKuRbNRg==
|
||||
|
||||
"@assemblyscript/loader@^0.19.21":
|
||||
version "0.19.23"
|
||||
resolved "https://registry.yarnpkg.com/@assemblyscript/loader/-/loader-0.19.23.tgz#7fccae28d0a2692869f1d1219d36093bc24d5e72"
|
||||
|
@ -4357,6 +4362,11 @@ event-target-shim@^5.0.0:
|
|||
resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
|
||||
integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==
|
||||
|
||||
eventemitter-asyncresource@^1.0.0:
|
||||
version "1.0.0"
|
||||
resolved "https://registry.yarnpkg.com/eventemitter-asyncresource/-/eventemitter-asyncresource-1.0.0.tgz#734ff2e44bf448e627f7748f905d6bdd57bdb65b"
|
||||
integrity sha512-39F7TBIV0G7gTelxwbEqnwhp90eqCPON1k0NwNfwhgKn4Co4ybUbj2pECcXT0B3ztRKZ7Pw1JujUUgmQJHcVAQ==
|
||||
|
||||
events@3.3.0, events@^3.3.0:
|
||||
version "3.3.0"
|
||||
resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
|
||||
|
@ -4985,6 +4995,15 @@ has@^1.0.3:
|
|||
dependencies:
|
||||
function-bind "^1.1.1"
|
||||
|
||||
hdr-histogram-js@^2.0.1:
|
||||
version "2.0.3"
|
||||
resolved "https://registry.yarnpkg.com/hdr-histogram-js/-/hdr-histogram-js-2.0.3.tgz#0b860534655722b6e3f3e7dca7b78867cf43dcb5"
|
||||
integrity sha512-Hkn78wwzWHNCp2uarhzQ2SGFLU3JY8SBDDd3TAABK4fc30wm+MuPOrg5QVFVfkKOQd6Bfz3ukJEI+q9sXEkK1g==
|
||||
dependencies:
|
||||
"@assemblyscript/loader" "^0.10.1"
|
||||
base64-js "^1.2.0"
|
||||
pako "^1.0.3"
|
||||
|
||||
hdr-histogram-js@^3.0.0:
|
||||
version "3.0.0"
|
||||
resolved "https://registry.yarnpkg.com/hdr-histogram-js/-/hdr-histogram-js-3.0.0.tgz#8e2d9a68e3313147804c47d85a9c22a93f85e24b"
|
||||
|
@ -6477,7 +6496,15 @@ next-tick@1, next-tick@^1.1.0:
|
|||
resolved "https://registry.yarnpkg.com/next-tick/-/next-tick-1.1.0.tgz#1836ee30ad56d67ef281b22bd199f709449b35eb"
|
||||
integrity sha512-CXdUiJembsNjuToQvxayPZF9Vqht7hewsvy2sOWafLvi2awflj9mOC6bHIg50orX8IJvWKY9wYQ/zB2kogPslQ==
|
||||
|
||||
node-addon-api@^3.1.0:
|
||||
nice-napi@^1.0.2:
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/nice-napi/-/nice-napi-1.0.2.tgz#dc0ab5a1eac20ce548802fc5686eaa6bc654927b"
|
||||
integrity sha512-px/KnJAJZf5RuBGcfD+Sp2pAKq0ytz8j+1NehvgIGFkvtvFrDM3T8E4x/JJODXK9WZow8RRGrbA9QQ3hs+pDhA==
|
||||
dependencies:
|
||||
node-addon-api "^3.0.0"
|
||||
node-gyp-build "^4.2.2"
|
||||
|
||||
node-addon-api@^3.0.0, node-addon-api@^3.1.0:
|
||||
version "3.2.1"
|
||||
resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-3.2.1.tgz#81325e0a2117789c0128dab65e7e38f07ceba161"
|
||||
integrity sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A==
|
||||
|
@ -6513,7 +6540,7 @@ node-gyp-build-optional-packages@5.0.2:
|
|||
resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.2.tgz#3de7d30bd1f9057b5dfbaeab4a4442b7fe9c5901"
|
||||
integrity sha512-PiN4NWmlQPqvbEFcH/omQsswWQbe5Z9YK/zdB23irp5j2XibaA2IrGvpSWmVVG4qMZdmPdwPctSy4a86rOMn6g==
|
||||
|
||||
node-gyp-build@^4.2.0, node-gyp-build@^4.3.0:
|
||||
node-gyp-build@^4.2.0, node-gyp-build@^4.2.2, node-gyp-build@^4.3.0:
|
||||
version "4.4.0"
|
||||
resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.4.0.tgz#42e99687ce87ddeaf3a10b99dc06abc11021f3f4"
|
||||
integrity sha512-amJnQCcgtRVw9SvoebO3BKGESClrfXGCUTX9hSn1OuGQTQBOZmVd0Z0OlecpuRksKvbsUqALE8jls/ErClAPuQ==
|
||||
|
@ -7033,6 +7060,17 @@ pify@^4.0.1:
|
|||
resolved "https://registry.yarnpkg.com/pify/-/pify-4.0.1.tgz#4b2cd25c50d598735c50292224fd8c6df41e3231"
|
||||
integrity sha512-uB80kBFb/tfd68bVleG9T5GGsGPjJrLAUpR5PZIrhBnIaRTQRjqdJSsIKkOP6OAIFbj7GOrcudc5pNjZ+geV2g==
|
||||
|
||||
piscina@^3.2.0:
|
||||
version "3.2.0"
|
||||
resolved "https://registry.yarnpkg.com/piscina/-/piscina-3.2.0.tgz#f5a1dde0c05567775690cccefe59d9223924d154"
|
||||
integrity sha512-yn/jMdHRw+q2ZJhFhyqsmANcbF6V2QwmD84c6xRau+QpQOmtrBCoRGdvTfeuFDYXB5W2m6MfLkjkvQa9lUSmIA==
|
||||
dependencies:
|
||||
eventemitter-asyncresource "^1.0.0"
|
||||
hdr-histogram-js "^2.0.1"
|
||||
hdr-histogram-percentiles-obj "^3.0.0"
|
||||
optionalDependencies:
|
||||
nice-napi "^1.0.2"
|
||||
|
||||
pixelmatch@^4.0.2:
|
||||
version "4.0.2"
|
||||
resolved "https://registry.yarnpkg.com/pixelmatch/-/pixelmatch-4.0.2.tgz#8f47dcec5011b477b67db03c243bc1f3085e8854"
|
||||
|
|
Loading…
Reference in a new issue