1
0
Fork 0

Add a pool of requests instead of making a request at each action (add

video/remove video) for performance in big networks
This commit is contained in:
Chocobozzz 2015-12-04 16:13:32 +01:00
parent af82cae07d
commit 0b69752270
12 changed files with 403 additions and 210 deletions

View file

@ -18,10 +18,8 @@
}
remote.remoteVideosAdd = function (req, res, next) {
req.checkBody('data.name', 'Should have a name').isLength(1, 50)
req.checkBody('data.description', 'Should have a description').isLength(1, 250)
req.checkBody('data.magnetUri', 'Should have a magnetUri').notEmpty()
req.checkBody('data.podUrl', 'Should have a podUrl').isURL()
req.checkBody('data').isArray()
req.checkBody('data').eachIsRemoteVideosAddValid()
logger.debug('Checking remoteVideosAdd parameters', { parameters: req.body })
@ -29,7 +27,8 @@
}
remote.remoteVideosRemove = function (req, res, next) {
req.checkBody('data.magnetUri', 'Should have a magnetUri').notEmpty()
req.checkBody('data').isArray()
req.checkBody('data').eachIsRemoteVideosRemoveValid()
logger.debug('Checking remoteVideosRemove parameters', { parameters: req.body })

View file

@ -46,6 +46,7 @@
"jquery": "^2.1.4",
"js-yaml": "^3.3.1",
"load-grunt-tasks": "^3.3.0",
"lodash-node": "3.10.1",
"mkdirp": "^0.5.1",
"mongoose": "^4.0.5",
"morgan": "^1.5.3",
@ -57,6 +58,7 @@
"segfault-handler": "^0.2.4",
"time-grunt": "^1.2.1",
"ursa": "^0.9.1",
"validator": "^4.3.0",
"webtorrent": "*",
"winston": "^1.0.1",
"ws": "^0.8.0"

View file

@ -3,21 +3,23 @@
var express = require('express')
var router = express.Router()
var pluck = require('lodash-node/compat/collection/pluck')
var middleware = require('../../../middlewares')
var miscMiddleware = middleware.misc
var reqValidator = middleware.reqValidators.remote
var videos = require('../../../src/videos')
function addRemoteVideos (req, res, next) {
videos.addRemote(req.body.data, function (err, video) {
videos.addRemotes(req.body.data, function (err, videos) {
if (err) return next(err)
res.json(video)
res.json(videos)
})
}
function removeRemoteVideo (req, res, next) {
videos.removeRemote(req.body.signature.url, req.body.data.magnetUri, function (err) {
videos.removeRemotes(req.body.signature.url, pluck(req.body.data, 'magnetUri'), function (err) {
if (err) return next(err)
res.status(204)

View file

@ -35,7 +35,9 @@
// ----------- PeerTube modules -----------
var config = require('config')
var customValidators = require('./src/customValidators')
var logger = require('./src/logger')
var poolRequests = require('./src/poolRequests')
var routes = require('./routes')
var videos = require('./src/videos')
var webtorrent = require('./src/webTorrentNode')
@ -56,7 +58,9 @@
app.use(multer({ dest: uploads }))
app.use(bodyParser.urlencoded({ extended: false }))
// Validate some params for the API
app.use(expressValidator())
app.use(expressValidator({
customValidators: customValidators
}))
// ----------- Views, routes and static files -----------
@ -154,6 +158,9 @@
// ----------- Make the server listening -----------
server.listen(port, function () {
// Activate the pool requests
poolRequests.activate()
videos.seedAll(function () {
logger.info('Seeded all the videos')
logger.info('Server listening on port %d', port)

29
src/customValidators.js Normal file
View file

@ -0,0 +1,29 @@
;(function () {
'use strict'
var validator = require('validator')
var customValidators = {}
customValidators.eachIsRemoteVideosAddValid = function (values) {
return values.every(function (val) {
return validator.isLength(val.name, 1, 50) &&
validator.isLength(val.description, 1, 50) &&
validator.isLength(val.magnetUri, 10) &&
validator.isURL(val.podUrl)
})
}
customValidators.eachIsRemoteVideosRemoveValid = function (values) {
return values.every(function (val) {
return validator.isLength(val.magnetUri, 10)
})
}
customValidators.isArray = function (value) {
return Array.isArray(value)
}
// ----------- Export -----------
module.exports = customValidators
})()

View file

@ -30,6 +30,15 @@
var PodsDB = mongoose.model('pods', podsSchema)
// ----------- PoolRequests -----------
var poolRequestsSchema = mongoose.Schema({
type: String,
id: String, // Special id to find duplicates (video created we want to remove...)
request: mongoose.Schema.Types.Mixed
})
var PoolRequestsDB = mongoose.model('poolRequests', poolRequestsSchema)
// ----------- Connection -----------
mongoose.connect('mongodb://' + host + ':' + port + '/' + dbname)
@ -45,6 +54,7 @@
// ----------- Export -----------
module.exports = {
VideosDB: VideosDB,
PodsDB: PodsDB
PodsDB: PodsDB,
PoolRequestsDB: PoolRequestsDB
}
})()

View file

@ -8,6 +8,7 @@
var logger = require('./logger')
var PodsDB = require('./database').PodsDB
var poolRequests = require('./poolRequests')
var utils = require('./utils')
var pods = {}
@ -16,13 +17,6 @@
var host = config.get('webserver.host')
var port = config.get('webserver.port')
// ----------- Constants -----------
var PODS_SCORE = {
MALUS: -10,
BONUS: 10
}
// ----------- Private functions -----------
function getForeignPodsList (url, callback) {
@ -34,25 +28,6 @@
})
}
function updatePodsScore (good_pods, bad_pods) {
logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec()
PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) {
if (err) throw err
removeBadPods()
})
}
function removeBadPods () {
PodsDB.remove({ score: 0 }, function (err, result) {
if (err) throw err
var number_removed = result.result.n
if (number_removed !== 0) logger.info('Removed %d pod.', number_removed)
})
}
// ----------- Public functions -----------
pods.list = function (callback) {
@ -93,58 +68,16 @@
})
}
// { path, data }
pods.makeSecureRequest = function (data, callback) {
if (callback === undefined) callback = function () {}
PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
if (err) {
logger.error('Cannot get the list of the pods.', { error: err })
return callback(err)
pods.addVideoToFriends = function (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
poolRequests.addToPoolRequests(id, 'add', video)
}
logger.debug('Make multiple requests.')
var params = {
encrypt: true,
sign: true,
method: data.method,
path: data.path,
data: data.data
}
var bad_pods = []
var good_pods = []
utils.makeMultipleRetryRequest(
params,
pods,
function callbackEachPodFinished (err, response, body, pod, callback_each_pod_finished) {
if (err || response.statusCode !== 200) {
bad_pods.push(pod._id)
logger.error('Error sending secure request to %s/%s pod.', pod.url, data.path, { error: err })
} else {
good_pods.push(pod._id)
}
return callback_each_pod_finished()
},
function callbackAllPodsFinished (err) {
if (err) {
logger.error('There was some errors when sending the video meta data.', { error: err })
return callback(err)
}
logger.debug('Finished')
updatePodsScore(good_pods, bad_pods)
callback(null)
}
)
})
pods.removeVideoToFriends = function (video) {
// To avoid duplicates
var id = video.name + video.magnetUri
poolRequests.addToPoolRequests(id, 'remove', video)
}
pods.makeFriends = function (callback) {
@ -214,7 +147,7 @@
pods_list,
function eachRequest (err, response, body, pod, callback_each_request) {
function eachRequest (err, response, body, url, pod, callback_each_request) {
// We add the pod if it responded correctly with its public certificate
if (!err && response.statusCode === 200) {
pods.add({ url: pod.url, publicKey: body.cert, score: global.FRIEND_BASE_SCORE }, function (err) {

158
src/poolRequests.js Normal file
View file

@ -0,0 +1,158 @@
;(function () {
'use strict'
var async = require('async')
var logger = require('./logger')
var database = require('./database')
var PoolRequestsDB = database.PoolRequestsDB
var PodsDB = database.PodsDB
var utils = require('./utils')
var poolRequests = {}
// ----------- Constants -----------
// Time to wait between requests to the friends
var INTERVAL = utils.isTestInstance() ? 10000 : 60000
var PODS_SCORE = {
MALUS: -10,
BONUS: 10
}
// ----------- Private -----------
var timer = null
function makePoolRequests () {
logger.info('Making pool requests to friends.')
PoolRequestsDB.find({}, { type: 1, request: 1 }, function (err, pool_requests) {
if (err) throw err
var requests = {
add: [],
remove: []
}
async.each(pool_requests, function (pool_request, callback_each) {
if (pool_request.type === 'add') {
requests.add.push(pool_request.request)
} else if (pool_request.type === 'remove') {
requests.remove.push(pool_request.request)
} else {
throw new Error('Unkown pool request type.')
}
callback_each()
}, function () {
makePoolRequest('add', requests.add)
makePoolRequest('remove', requests.remove)
logger.info('Pool requests to friends sent.')
})
})
}
function updatePodsScore (good_pods, bad_pods) {
logger.info('Updating %d good pods and %d bad pods scores.', good_pods.length, bad_pods.length)
PodsDB.update({ _id: { $in: good_pods } }, { $inc: { score: PODS_SCORE.BONUS } }, { multi: true }).exec()
PodsDB.update({ _id: { $in: bad_pods } }, { $inc: { score: PODS_SCORE.MALUS } }, { multi: true }, function (err) {
if (err) throw err
removeBadPods()
})
}
function removeBadPods () {
PodsDB.remove({ score: 0 }, function (err, result) {
if (err) throw err
var number_removed = result.result.n
if (number_removed !== 0) logger.info('Removed %d pod.', number_removed)
})
}
function makePoolRequest (type, requests) {
logger.debug('Make pool requests scheduled.')
PodsDB.find({}, { _id: 1, url: 1, publicKey: 1 }).exec(function (err, pods) {
if (err) throw err
var params = {
encrypt: true,
sign: true,
method: 'POST',
path: null,
data: requests
}
if (type === 'add') {
params.path = '/api/' + global.API_VERSION + '/remotevideos/add'
} else if (type === 'remove') {
params.path = '/api/' + global.API_VERSION + '/remotevideos/remove'
} else {
throw new Error('Unkown pool request type.')
}
var bad_pods = []
var good_pods = []
utils.makeMultipleRetryRequest(params, pods, callbackEachPodFinished, callbackAllPodsFinished)
function callbackEachPodFinished (err, response, body, url, pod, callback_each_pod_finished) {
if (err || response.statusCode !== 200) {
bad_pods.push(pod._id)
logger.error('Error sending secure request to %s pod.', url, { error: err })
} else {
good_pods.push(pod._id)
}
return callback_each_pod_finished()
}
function callbackAllPodsFinished (err) {
if (err) {
logger.error('There was some errors when sending the video meta data.', { error: err })
}
updatePodsScore(good_pods, bad_pods)
PoolRequestsDB.remove().exec()
}
})
}
// ----------- Public -----------
poolRequests.activate = function () {
logger.info('Pool requests activated.')
timer = setInterval(makePoolRequests, INTERVAL)
}
poolRequests.addToPoolRequests = function (id, type, request) {
logger.debug('Add request to the pool requests.', { id: id, type: type, request: request })
PoolRequestsDB.findOne({ id: id }, function (err, entity) {
if (err) logger.error(err)
if (entity) {
if (entity.type === type) {
logger.error(new Error('Cannot insert two same requests.'))
return
}
// Remove the request of the other type
PoolRequestsDB.remove({ id: id }, function (err) {
if (err) logger.error(err)
})
} else {
PoolRequestsDB.create({ id: id, type: type, request: request }, function (err) {
if (err) logger.error(err)
})
}
})
}
poolRequests.deactivate = function () {
logger.info('Pool requests deactivated.')
clearInterval(timer)
}
module.exports = poolRequests
})()

View file

@ -36,7 +36,7 @@
replay(
request.post(params, function (err, response, body) {
callbackEach(err, response, body, to_pod)
callbackEach(err, response, body, params.url, to_pod)
}),
{
retries: retries,
@ -71,8 +71,8 @@
// Make a request for each pod
async.each(pods, function (pod, callback_each_async) {
function callbackEachRetryRequest (err, response, body, pod) {
callbackEach(err, response, body, pod, function () {
function callbackEachRetryRequest (err, response, body, url, pod) {
callbackEach(err, response, body, url, pod, function () {
callback_each_async()
})
}

View file

@ -3,6 +3,7 @@
var async = require('async')
var config = require('config')
var dz = require('dezalgo')
var fs = require('fs')
var webtorrent = require('./webTorrentNode')
@ -67,19 +68,10 @@
return callback(err)
}
// Now we'll send the video's meta data
// Now we'll add the video's meta data to our friends
params.namePath = null
logger.info('Sending %s video to friends.', video_file.path)
var data = {
path: '/api/' + global.API_VERSION + '/remotevideos/add',
method: 'POST',
data: params
}
// Do not wait the secure requests
pods.makeSecureRequest(data)
pods.addVideoToFriends(params)
callback(null)
})
})
@ -124,16 +116,12 @@
return callback(err)
}
var data = {
path: '/api/' + global.API_VERSION + '/remotevideos/remove',
method: 'POST',
data: {
var params = {
name: video.name,
magnetUri: video.magnetUri
}
}
// Yes this is a POST request because we add some informations in the body (signature, encrypt etc)
pods.makeSecureRequest(data)
pods.removeVideoToFriends(params)
callback(null)
})
})
@ -142,49 +130,65 @@
}
// Use the magnet Uri because the _id field is not the same on different servers
videos.removeRemote = function (fromUrl, magnetUri, callback) {
VideosDB.findOne({ magnetUri: magnetUri }, function (err, video) {
if (err || !video) {
logger.error('Cannot find the torrent URI of this remote video.')
videos.removeRemotes = function (fromUrl, magnetUris, callback) {
VideosDB.find({ magnetUri: { $in: magnetUris } }, function (err, videos) {
if (err || !videos) {
logger.error('Cannot find the torrent URI of these remote videos.')
return callback(err)
}
// TODO: move to reqValidators middleware ?
var to_remove = []
async.each(videos, function (video, callback_async) {
callback_async = dz(callback_async)
if (video.podUrl !== fromUrl) {
logger.error('The pod has not the rights on this video.')
return callback(err)
logger.error('The pod %s has not the rights on the video of %s.', fromUrl, video.podUrl)
} else {
to_remove.push(video._id)
}
VideosDB.findByIdAndRemove(video._id, function (err) {
callback_async()
}, function () {
VideosDB.remove({ _id: { $in: to_remove } }, function (err) {
if (err) {
logger.error('Cannot remove the remote video.')
logger.error('Cannot remove the remote videos.')
return callback(err)
}
callback(null)
})
})
})
}
// { name, magnetUri, podUrl }
videos.addRemote = function (data, callback) {
logger.debug('Add remote video from pod: %s', data.podUrl)
videos.addRemotes = function (videos, callback) {
var to_add = []
async.each(videos, function (video, callback_each) {
callback_each = dz(callback_each)
logger.debug('Add remote video from pod: %s', video.podUrl)
var params = {
name: data.name,
name: video.name,
namePath: null,
description: data.description,
magnetUri: data.magnetUri,
podUrl: data.podUrl
description: video.description,
magnetUri: video.magnetUri,
podUrl: video.podUrl
}
VideosDB.create(params, function (err, video) {
to_add.push(params)
callback_each()
}, function () {
VideosDB.create(to_add, function (err, videos) {
if (err) {
logger.error('Cannot insert this remote video.', { error: err })
return callback(err)
}
return callback(null, video)
return callback(null, videos)
})
})
}

View file

@ -103,9 +103,9 @@
})
it('Should make friends with the pods 1, 2, 3', function (done) {
this.timeout(100000)
this.timeout(150000)
// Pods 1, 2, 3 and 4 become friends
// Pods 1, 2, 3 and 4 become friends (yes this is beautiful)
makeFriend(2, function () {
makeFriend(1, function () {
makeFriend(4, function () {
@ -114,9 +114,13 @@
// Expulse pod 4 from pod 1 and 2
uploadVideo(1, function () {
setTimeout(function () {
uploadVideo(1, function () {
setTimeout(function () {
uploadVideo(2, function () {
setTimeout(function () {
uploadVideo(2, function () {
setTimeout(function () {
// Rerun server 4
utils.runServer(4, function (app, url) {
apps[3] = app
@ -142,9 +146,13 @@
})
})
})
}, 11000)
})
}, 11000)
})
}, 11000)
})
}, 11000)
})
})
})

View file

@ -14,7 +14,7 @@
var path = '/api/v1/videos'
var apps = []
var urls = []
var video_id = -1
var to_remove = []
function getVideosList (url, end) {
request(url)
@ -36,6 +36,14 @@
.end(end)
}
function removeVideo (url, id, end) {
request(url)
.delete(path + '/' + id)
.set('Accept', 'application/json')
.expect(204)
.end(end)
}
before(function (done) {
this.timeout(30000)
var path_friends = '/api/v1/pods/makefriends'
@ -89,7 +97,7 @@
describe('Should upload the video and propagate on each pod', function () {
it('Should upload the video on pod 1 and propagate on each pod', function (done) {
this.timeout(5000)
this.timeout(15000)
uploadVideo(urls[0], 'my super name for pod 1', 'my super description for pod 1', 'video_short1.webm', function (err) {
if (err) throw err
@ -125,12 +133,12 @@
done()
})
}, 1000)
}, 11000)
})
})
it('Should upload the video on pod 2 and propagate on each pod', function (done) {
this.timeout(5000)
this.timeout(15000)
uploadVideo(urls[1], 'my super name for pod 2', 'my super description for pod 2', 'video_short2.webm', function (err) {
if (err) throw err
@ -166,15 +174,17 @@
done()
})
}, 1000)
}, 11000)
})
})
it('Should upload the video on pod 3 and propagate on each pod', function (done) {
this.timeout(5000)
it('Should upload two videos on pod 3 and propagate on each pod', function (done) {
this.timeout(15000)
uploadVideo(urls[2], 'my super name for pod 3', 'my super description for pod 3', 'video_short3.webm', function (err) {
if (err) throw err
uploadVideo(urls[2], 'my super name for pod 3-2', 'my super description for pod 3-2', 'video_short.webm', function (err) {
if (err) throw err
setTimeout(function () {
var base_magnet = null
@ -185,13 +195,19 @@
var videos = res.body
expect(videos).to.be.an('array')
expect(videos.length).to.equal(3)
expect(videos.length).to.equal(4)
var video = videos[2]
expect(video.name).to.equal('my super name for pod 3')
expect(video.description).to.equal('my super description for pod 3')
expect(video.podUrl).to.equal('http://localhost:9003')
expect(video.magnetUri).to.exist
video = videos[3]
expect(video.name).to.equal('my super name for pod 3-2')
expect(video.description).to.equal('my super description for pod 3-2')
expect(video.podUrl).to.equal('http://localhost:9003')
expect(video.magnetUri).to.exist
// All pods should have the same magnet Uri
if (base_magnet === null) {
base_magnet = video.magnetUri
@ -206,7 +222,8 @@
done()
})
}, 1000)
}, 11000)
})
})
})
})
@ -220,6 +237,9 @@
if (err) throw err
var video = res.body[0]
to_remove.push(res.body[2]._id)
to_remove.push(res.body[3]._id)
webtorrent.add(video.magnetUri, function (torrent) {
expect(torrent.files).to.exist
expect(torrent.files.length).to.equal(1)
@ -257,7 +277,6 @@
if (err) throw err
var video = res.body[2]
video_id = res.body[1]._id
webtorrent.add(video.magnetUri, function (torrent) {
expect(torrent.files).to.exist
@ -269,18 +288,38 @@
})
})
it('Should remove the file 2 by asking pod 2', function (done) {
request(urls[1])
.delete(path + '/' + video_id)
.set('Accept', 'application/json')
.expect(204)
.end(function (err, res) {
it('Should add the file 3-2 by asking pod 1', function (done) {
// Yes, this could be long
this.timeout(200000)
getVideosList(urls[0], function (err, res) {
if (err) throw err
var video = res.body[3]
webtorrent.add(video.magnetUri, function (torrent) {
expect(torrent.files).to.exist
expect(torrent.files.length).to.equal(1)
expect(torrent.files[0].path).to.exist.and.to.not.equal('')
done()
})
})
})
it('Should remove the file 3 and 3-2 by asking pod 3', function (done) {
this.timeout(15000)
removeVideo(urls[2], to_remove[0], function (err) {
if (err) throw err
removeVideo(urls[2], to_remove[1], function (err) {
if (err) throw err
// Wait the propagation to the other pods
setTimeout(function () {
done()
}, 1000)
}, 11000)
})
})
})
@ -293,8 +332,10 @@
expect(videos).to.be.an('array')
expect(videos.length).to.equal(2)
expect(videos[0]._id).not.to.equal(videos[1]._id)
expect(videos[0]._id).not.to.equal(video_id)
expect(videos[1]._id).not.to.equal(video_id)
expect(videos[0]._id).not.to.equal(to_remove[0])
expect(videos[1]._id).not.to.equal(to_remove[0])
expect(videos[0]._id).not.to.equal(to_remove[1])
expect(videos[1]._id).not.to.equal(to_remove[1])
callback()
})