1
0
Fork 0

Server: finish old jobs at startup

This commit is contained in:
Chocobozzz 2017-05-05 17:24:16 +02:00
parent e5b8853905
commit 4e284e97b9
3 changed files with 44 additions and 20 deletions

View file

@ -29,6 +29,11 @@ const missed = checker.checkMissedConfig()
if (missed.length !== 0) {
throw new Error('Miss some configurations keys : ' + missed)
}
checker.checkFFmpeg(function (err) {
if (err) {
throw err
}
})
const errorMessage = checker.checkConfig()
if (errorMessage !== null) {

View file

@ -15,31 +15,40 @@ const jobScheduler = {
}
function activate () {
const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
logger.info('Jobs scheduler activated.')
const jobsQueue = queue(processJob)
forever(
function (next) {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
}
// Finish processing jobs from a previous start
const state = constants.JOB_STATES.PROCESSING
db.Job.listWithLimit(limit, state, function (err, jobs) {
enqueueJobs(err, jobsQueue, jobs)
db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
if (err) {
logger.error('Cannot list pending jobs.', { error: err })
} else {
jobs.forEach(function (job) {
jobsQueue.push(job)
})
forever(
function (next) {
if (jobsQueue.length() !== 0) {
// Finish processing the queue first
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
}
// Optimization: we could use "drain" from queue object
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
})
}
)
const state = constants.JOB_STATES.PENDING
db.Job.listWithLimit(limit, state, function (err, jobs) {
if (err) {
logger.error('Cannot list pending jobs.', { error: err })
} else {
jobs.forEach(function (job) {
jobsQueue.push(job)
})
}
// Optimization: we could use "drain" from queue object
return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
})
}
)
})
}
// ---------------------------------------------------------------------------
@ -48,6 +57,16 @@ module.exports = jobScheduler
// ---------------------------------------------------------------------------
function enqueueJobs (err, jobsQueue, jobs) {
if (err) {
logger.error('Cannot list pending jobs.', { error: err })
} else {
jobs.forEach(function (job) {
jobsQueue.push(job)
})
}
}
function createJob (transaction, handlerName, handlerInputData, callback) {
const createQuery = {
state: constants.JOB_STATES.PENDING,

View file

@ -39,14 +39,14 @@ module.exports = function (sequelize, DataTypes) {
// ---------------------------------------------------------------------------
function listWithLimit (limit, callback) {
function listWithLimit (limit, state, callback) {
const query = {
order: [
[ 'id', 'ASC' ]
],
limit: limit,
where: {
state: constants.JOB_STATES.PENDING
state
}
}