Distributed timed job queue, backed by redis.
- Support redis cluster.
- Support one or more
timed-queueinstance in a redis instance. Eachtimed-queueinstance are segregated byprefixoption - Support one or more job queues in a timed-queue instance.
- Support one or more timed-queue clients for a timed-queue instance.
const TimedQueue = require('timed-queue')
const timedQueue = new TimedQueue({prefix: 'TQ1', interval: 1000 * 60})
// connect to redis cluster.
timedQueue.connect([7000, 7001, 7002])
.on('error', function (error) {
console.error(error)
})
// create 'event' job queue in timed-queue instance
const eventQueue = timedQueue.queue('event')
// add 'job' listener
eventQueue.on('job', function (jobObj) {
// ... just do some thing
// ACK the job
eventQueue.ackjob(jobObj.job)()
})
// add job to queue
eventQueue.addjob(eventObj.id, new Date(eventObj.startDate).getTime() - 10 * 60 * 1000)(function (err, res) {
console.log(err, res)
})npm install timed-queueJob Class:
function Job (queue, job, timing, active, retryCount) {
this.queue = queue
this.job = job
this.timing = timing
this.active = active
this.retryCount = retryCount
}this.queue: {String} Queue namethis.job: {String} The job's namethis.timing: {Number} The time in millisecond when the job should be activedthis.active: {Number} The actual time in millisecond that the job be activedthis.retryCount: {Number} A job that has been actived but has not been ACK inretrytime will be actived again.retryCountis times that the job re-actived.
const TimedQueue = require('timed-queue')Return a timedQueue client. It is an EventEmitter instance.
options.prefix: {String} Redis key's prefix, or namespace. Default to"TIMEDQ"options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to64options.interval: {Number} Interval time for scanning. Default to1000 * 60msoptions.retry: {Number} Retry time for a job. A job that has been actived but has not been ACK inretrytime will be actived again. Default tointerval / 2msoptions.expire: {Number} Expiration time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default tointerval * 5msoptions.accuracy: {Number} Scanning accuracy. Default tointerval / 5options.autoScan: {Boolean} The flag to enable or disable automatic scan. Default totrue. It can be set tofalseif automatic scan is not desired.
const timedQueue = new TimedQueue()- timedQueue.on('connect', function () {})
- timedQueue.on('error', function (error) {})
- timedQueue.on('close', function () {})
- timedQueue.on('scanStart', function (queuesLength) {})
- timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})
Connect to redis. Arguments are the same as thunk-redis's createClient, or give a thunk-redis instance.
timedQueue.connect()Start scanning. It automatically starts after connect method is called unless autoScan is set to false.
Stop scanning.
Close the timedQueue. It closes redis client of the timedQueue accordingly.
It is used to regulate the automatic scanning frequency.
Remove the queue. It deletes all data in the queue from redis.
Return a Queue instance if one exists. Otherwise it creates a Queue instance and return it. Queue instance is a EventEmitter instance.
queue: {String} The queue's nameoptions.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expiration time for job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy, Default to timedQueue'saccuracy
const eventQueue = timedQueue.queue('event', {retry: 1000, expire: 5000})- queue.on('job', function (job) {})
If no job listener on queue, queue scanning will not run.
options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expire time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy. Default to timedQueue'saccuracy
Add one or more jobs to the queue. It can be used to update the job's timing.
job: {String} The job's nametiming: {Number} The time in millisecond when the job should be actived. It should greater thanDate.now()
eventQueue.addjob('52b3b5f49c2238313600015d', 1441552050409)(function (err, res) {
console.log(err, res)
// null, 1
})Show the job info.
job: {String} job
eventQueue.show('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res)
// {
// queue: 'event',
// job: '52b3b5f49c2238313600015d',
// timing: 1441552050409
// active: 0,
// retryCount: 0
// }
})Delete one or more jobs.
job: {String} job
eventQueue.deljob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})It is called by Queue.prototype.scan. It should not be called explicitly unless you know what you are doing.
ACK one or more jobs.
job: {String} job
eventQueue.ackjob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})It is called by TimedQueue.prototype.scan. It should not be called explicitly unless you know what you are doing.
Return the queue' length.
eventQueue.len()(function (err, res) {
console.log(err, res) // null, 3
})Return actived jobs in the queue.
eventQueue.showActive()(function (err, res) {
console.log(err, res) // null, [jobs...]
})