Skip to main content

πŸ”§ Queue Wrapper

πŸ“– Overview​

The Queue Wrapper (queue-manager/common/queue-wrapper.js) is a factory function that creates standardized Bull queues with Redlock distributed locking, event handling, and automatic cleanup.

Purpose: Provide consistent queue creation with built-in:

  • Distributed locking (Redlock)
  • Error handling and retry logic
  • Event callbacks (failed, completed, stalled)
  • Queue registration and tracking
  • Level-based cleanup (global vs account)

πŸ”§ Function Signature​

module.exports =
(name, // Queue name
level, // 'global' or 'account'
{
processCb, // Job processor function
failedCb, // Failure callback (optional)
completedCb, // Completion callback (optional)
settings, // Bull queue settings (optional)
concurrency, // Worker concurrency (default: 1)
},
runCompletedCbAfterNoActives, // Run completedCb after no active jobs (default: true)
redlock); // Redlock configuration (optional)

πŸ“‹ Parameters​

Required Parameters​

name (String)

  • Unique queue identifier
  • Used for Redis key and logging
  • Example: 'contacts-import', 'affiliates-commissions'

level (String: 'global' | 'account')

  • 'global': Queue persists across accounts
  • 'account': Queue specific to an account, auto-cleaned after completion

processCb (Function)

  • Main job processor function
  • Signature: async (job, done) => { ... }
  • Receives Bull job object and done callback

Optional Parameters​

failedCb (Function)

  • Called when job fails
  • Signature: async (job, error) => { ... }
  • Use for custom error handling

completedCb (Function)

  • Called when job completes
  • Signature: async (job, result) => { ... }
  • Use for cleanup or notifications

settings (Object)

  • Bull queue advanced settings
  • Example: { attempts: 3, backoff: { type: 'exponential', delay: 2000 } }

concurrency (Number)

  • Worker concurrency (parallel jobs)
  • Default: 1
  • Higher values for I/O-bound jobs

runCompletedCbAfterNoActives (Boolean)

  • For account-level queues
  • When true: Runs completedCb only after all jobs finish
  • When false: Runs completedCb immediately
  • Default: true

redlock (Object)

  • Distributed locking configuration
  • Properties:
    • active (Boolean): Enable Redlock
    • id (String): Field in job.data to use as lock key
    • type (String): Lock prefix ('account', 'document', 'global')

πŸ” Redlock Integration​

Purpose​

Prevents duplicate processing when multiple Queue Manager instances run simultaneously.

Configuration​

const queue = queueWrapper('my-queue', 'global', { processCb: myProcessor }, true, {
active: true,
id: 'accountId', // job.data.accountId used as lock key
type: 'account', // Lock key: locks:account:{accountId}
});

Lock Key Patterns​

// Account-level lock
locks: account: 12345;

// Document-level lock
locks: document: abc123;

// Global resource lock
locks: global: resource - name;

Lock Behavior​

  • Lock Duration: 1000ms (1 second)
  • Retry: 10 attempts with 200ms delay
  • Drift Factor: 0.01
  • Failure: Job fails if lock cannot be acquired

πŸ“Š Event Handling​

Built-In Events​

Error Event

Q.on('error', err => {
logger.error({
initiator: 'QM/wrapper/' + name,
message: err?.message || 'Queue error',
raw_error: { name: err?.name, message: err?.message, stack: err?.stack },
});
});

Stalled Event

Q.on('stalled', job => {
logger.error({
initiator: 'QM/wrapper/' + name,
message: 'Job stalled',
additional_data: { job_data: job },
});
job.retry(); // Automatic retry
});

Failed Event

Q.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/wrapper/' + name,
raw_error: { name: err?.name, message: err?.message, stack: err?.stack },
});
failedCb && (await failedCb(job, err)); // Custom callback
});

Completed Event

Q.on('completed', async (job, res) => {
logger.log({
initiator: `QM/wrapper/${name}`,
message: `Queue Job Completed, ID: ` + job.id,
});

if (level === 'global' || !runCompletedCbAfterNoActives) {
completedCb && (await completedCb(job, res));
}

if (level === 'account') {
const actives = await Q.getActiveCount();
if (actives <= 0) {
if (runCompletedCbAfterNoActives) {
completedCb && (await completedCb(job, res));
}
Q.obliterate({ force: true }); // Cleanup account-level queue
}
}
});

πŸ—ΊοΈ Global Queue Registry​

Purpose​

All queues registered in global.runningQueuesMap for:

  • Queue status monitoring
  • Graceful shutdown coordination
  • Manual queue management

Usage​

// Get queue instance
const queue = global.runningQueuesMap.get('contacts-import');

// Check queue metrics
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();

// Pause queue
await queue.pause();

// Resume queue
await queue.resume();

// Close queue
await queue.close();

Graceful Shutdown​

Queue Manager uses runningQueuesMap for graceful shutdown:

process.on('SIGTERM', async () => {
if (global?.runningQueuesMap?.size) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
}
process.exit(0);
});

πŸ’‘ Usage Examples​

Simple Global Queue​

const queueWrapper = require('../common/queue-wrapper');

const myQueue = queueWrapper('my-simple-queue', 'global', {
processCb: async (job, done) => {
const { data } = job;
console.log('Processing:', data);
// Do work here
done(null, { success: true });
},
});

// Add job
await myQueue.add({ item: 'value' });

Queue with Redlock​

const myQueue = queueWrapper(
'account-queue',
'account',
{
processCb: async (job, done) => {
// Process account-specific job
const { accountId, data } = job.data;
// ... processing logic
done(null, { processed: true });
},
concurrency: 3, // Process 3 jobs at once
},
true,
{
active: true,
id: 'accountId', // Lock by account
type: 'account',
},
);

Queue with Callbacks​

const myQueue = queueWrapper('callback-queue', 'global', {
processCb: async (job, done) => {
// Main processing
done(null, { result: 'success' });
},
failedCb: async (job, error) => {
// Custom failure handling
console.error('Job failed:', job.id, error);
// Send alert, update DB, etc.
},
completedCb: async (job, result) => {
// Post-processing
console.log('Job completed:', job.id, result);
// Send notification, cleanup, etc.
},
settings: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
},
},
});

High-Concurrency Queue​

const highConcurrencyQueue = queueWrapper('bulk-processing', 'global', {
processCb: async (job, done) => {
// Fast I/O-bound work
const result = await externalAPI.fetch(job.data);
done(null, result);
},
concurrency: 10, // Process 10 jobs simultaneously
});

Component Type: Core Infrastructure
Status: Stable
Last Updated: 2025-10-10

πŸ’¬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM