π§ Queue Wrapper
π Overviewβ
The Queue Wrapper (queue-manager/common/queue-wrapper.js) is a factory function that creates standardized Bull queues with Redlock distributed locking, event handling, and automatic cleanup.
Purpose: Provide consistent queue creation with built-in:
- Distributed locking (Redlock)
- Error handling and retry logic
- Event callbacks (failed, completed, stalled)
- Queue registration and tracking
- Level-based cleanup (global vs account)
π§ Function Signatureβ
module.exports =
(name, // Queue name
level, // 'global' or 'account'
{
processCb, // Job processor function
failedCb, // Failure callback (optional)
completedCb, // Completion callback (optional)
settings, // Bull queue settings (optional)
concurrency, // Worker concurrency (default: 1)
},
runCompletedCbAfterNoActives, // Run completedCb after no active jobs (default: true)
redlock); // Redlock configuration (optional)
π Parametersβ
Required Parametersβ
name (String)
- Unique queue identifier
- Used for Redis key and logging
- Example:
'contacts-import','affiliates-commissions'
level (String: 'global' | 'account')
'global': Queue persists across accounts'account': Queue specific to an account, auto-cleaned after completion
processCb (Function)
- Main job processor function
- Signature:
async (job, done) => { ... } - Receives Bull job object and done callback
Optional Parametersβ
failedCb (Function)
- Called when job fails
- Signature:
async (job, error) => { ... } - Use for custom error handling
completedCb (Function)
- Called when job completes
- Signature:
async (job, result) => { ... } - Use for cleanup or notifications
settings (Object)
- Bull queue advanced settings
- Example:
{ attempts: 3, backoff: { type: 'exponential', delay: 2000 } }
concurrency (Number)
- Worker concurrency (parallel jobs)
- Default:
1 - Higher values for I/O-bound jobs
runCompletedCbAfterNoActives (Boolean)
- For account-level queues
- When
true: Runs completedCb only after all jobs finish - When
false: Runs completedCb immediately - Default:
true
redlock (Object)
- Distributed locking configuration
- Properties:
active(Boolean): Enable Redlockid(String): Field injob.datato use as lock keytype(String): Lock prefix ('account','document','global')
π Redlock Integrationβ
Purposeβ
Prevents duplicate processing when multiple Queue Manager instances run simultaneously.
Configurationβ
const queue = queueWrapper('my-queue', 'global', { processCb: myProcessor }, true, {
active: true,
id: 'accountId', // job.data.accountId used as lock key
type: 'account', // Lock key: locks:account:{accountId}
});
Lock Key Patternsβ
// Account-level lock
locks: account: 12345;
// Document-level lock
locks: document: abc123;
// Global resource lock
locks: global: resource - name;
Lock Behaviorβ
- Lock Duration: 1000ms (1 second)
- Retry: 10 attempts with 200ms delay
- Drift Factor: 0.01
- Failure: Job fails if lock cannot be acquired
π Event Handlingβ
Built-In Eventsβ
Error Event
Q.on('error', err => {
logger.error({
initiator: 'QM/wrapper/' + name,
message: err?.message || 'Queue error',
raw_error: { name: err?.name, message: err?.message, stack: err?.stack },
});
});
Stalled Event
Q.on('stalled', job => {
logger.error({
initiator: 'QM/wrapper/' + name,
message: 'Job stalled',
additional_data: { job_data: job },
});
job.retry(); // Automatic retry
});
Failed Event
Q.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/wrapper/' + name,
raw_error: { name: err?.name, message: err?.message, stack: err?.stack },
});
failedCb && (await failedCb(job, err)); // Custom callback
});
Completed Event
Q.on('completed', async (job, res) => {
logger.log({
initiator: `QM/wrapper/${name}`,
message: `Queue Job Completed, ID: ` + job.id,
});
if (level === 'global' || !runCompletedCbAfterNoActives) {
completedCb && (await completedCb(job, res));
}
if (level === 'account') {
const actives = await Q.getActiveCount();
if (actives <= 0) {
if (runCompletedCbAfterNoActives) {
completedCb && (await completedCb(job, res));
}
Q.obliterate({ force: true }); // Cleanup account-level queue
}
}
});
πΊοΈ Global Queue Registryβ
Purposeβ
All queues registered in global.runningQueuesMap for:
- Queue status monitoring
- Graceful shutdown coordination
- Manual queue management
Usageβ
// Get queue instance
const queue = global.runningQueuesMap.get('contacts-import');
// Check queue metrics
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();
// Pause queue
await queue.pause();
// Resume queue
await queue.resume();
// Close queue
await queue.close();
Graceful Shutdownβ
Queue Manager uses runningQueuesMap for graceful shutdown:
process.on('SIGTERM', async () => {
if (global?.runningQueuesMap?.size) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
}
process.exit(0);
});
π‘ Usage Examplesβ
Simple Global Queueβ
const queueWrapper = require('../common/queue-wrapper');
const myQueue = queueWrapper('my-simple-queue', 'global', {
processCb: async (job, done) => {
const { data } = job;
console.log('Processing:', data);
// Do work here
done(null, { success: true });
},
});
// Add job
await myQueue.add({ item: 'value' });
Queue with Redlockβ
const myQueue = queueWrapper(
'account-queue',
'account',
{
processCb: async (job, done) => {
// Process account-specific job
const { accountId, data } = job.data;
// ... processing logic
done(null, { processed: true });
},
concurrency: 3, // Process 3 jobs at once
},
true,
{
active: true,
id: 'accountId', // Lock by account
type: 'account',
},
);
Queue with Callbacksβ
const myQueue = queueWrapper('callback-queue', 'global', {
processCb: async (job, done) => {
// Main processing
done(null, { result: 'success' });
},
failedCb: async (job, error) => {
// Custom failure handling
console.error('Job failed:', job.id, error);
// Send alert, update DB, etc.
},
completedCb: async (job, result) => {
// Post-processing
console.log('Job completed:', job.id, result);
// Send notification, cleanup, etc.
},
settings: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
},
},
});
High-Concurrency Queueβ
const highConcurrencyQueue = queueWrapper('bulk-processing', 'global', {
processCb: async (job, done) => {
// Fast I/O-bound work
const result = await externalAPI.fetch(job.data);
done(null, result);
},
concurrency: 10, // Process 10 jobs simultaneously
});
π Related Documentationβ
- Architecture (documentation unavailable)
- Configuration (documentation unavailable)
- Queue Manager Overview
Component Type: Core Infrastructure
Status: Stable
Last Updated: 2025-10-10