🏗️ Queue Manager Architecture
📖 Overview
The Queue Manager is a sophisticated background job processing system built on Bull queues with Redis backing. It uses environment-controlled module loading, Redlock for distributed locking, and node-cron for scheduled task execution.
🔄 Core Processing Patterns
Pattern 1: Cron → Service → Queue (Most Common)
This is the standard pattern used by most Queue Manager jobs:
sequenceDiagram
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE_FILE as Queue Definition
participant BULL as Bull/Redis
participant DB as MongoDB
CRON->>CRON: Schedule triggers (e.g., */5 * * * * *)
CRON->>SERVICE: Execute service function
SERVICE->>DB: Query flagged documents
DB-->>SERVICE: Return matching records
SERVICE->>SERVICE: Process business logic
SERVICE->>QUEUE_FILE: Add jobs to queue
QUEUE_FILE->>BULL: Enqueue jobs
BULL->>QUEUE_FILE: Process job with worker
QUEUE_FILE->>DB: Update results
QUEUE_FILE-->>BULL: Job complete
Flow Steps:
- Cron Initialization:
crons/[module]/[job].jsschedules execution - Service Processing:
services/[module]/[job].service.jsqueries DB and applies logic - Queue Addition: Service adds jobs to Bull queue via
queues/[module]/[job].js - Job Execution: Bull workers process jobs asynchronously
- Result Storage: Results written back to MongoDB
Pattern 2: Trigger → DB Flag → Cron → Service → Queue
Used when external triggers initiate processing:
sequenceDiagram
participant API as Internal API/Webhook
participant DB as MongoDB
participant TRIGGER as Trigger Endpoint
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE as Bull Queue
API->>TRIGGER: HTTP request
TRIGGER->>DB: Update flag<br/>(needsProcessing: true)
DB-->>TRIGGER: Flag updated
TRIGGER-->>API: 202 Accepted
loop Every N minutes
CRON->>SERVICE: Execute check
SERVICE->>DB: Query flagged documents
DB-->>SERVICE: Return flagged records
SERVICE->>SERVICE: Validate & process
SERVICE->>QUEUE: Add to Bull queue
QUEUE->>DB: Process & update results
end
Flow Steps:
- Trigger Receives Request: External endpoint receives webhook/API call
- DB Flag Update: Document flagged for processing (e.g.,
needsReportUpdate: true) - Cron Periodic Check: Scheduled cron queries for flagged documents
- Service Processing: Business logic validates and processes records
- Queue Execution: Jobs added to Bull queue for async processing
Pattern 3: Internal API → Queue Collection → Queue Manager
Used for queue collections like InstaReportsQueue, Queue, YextQueue:
sequenceDiagram
participant API as Internal API
participant QUEUE_COLL as Queue Collection
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE_FILE as Bull Queue
participant DB as MongoDB
API->>QUEUE_COLL: Create job document<br/>(status: 'pending')
QUEUE_COLL-->>API: Job created
loop Every N seconds
CRON->>SERVICE: Execute check
SERVICE->>QUEUE_COLL: Query pending jobs
QUEUE_COLL-->>SERVICE: Return pending records
SERVICE->>SERVICE: Validate jobs
SERVICE->>QUEUE_FILE: Add to Bull queue
QUEUE_FILE->>DB: Process data
QUEUE_FILE->>QUEUE_COLL: Update status<br/>(status: 'completed')
end
Flow Steps:
- Internal API Creates Job: Controller adds document to queue collection
- Job Document Stored: MongoDB stores job with
status: 'pending' - Cron Queries Collection: Scheduled check finds pending jobs
- Service Processes: Business logic validates and adds to Bull queue
- Status Update: Job status updated to
completedorfailed
🗄️ Queue Registry System
Global Queue Map
All active queues are registered in global.runningQueuesMap for:
- Queue Status Monitoring: Track active, waiting, delayed jobs
- Manual Job Management: Add/remove jobs programmatically
- Graceful Shutdown: Coordinate queue cleanup on SIGTERM
// Queue registration in queue-wrapper.js
global.runningQueuesMap.set(name, Q);
// Access queues globally
const queue = global.runningQueuesMap.get('contacts-import');
const activeCount = await queue.getActiveCount();
Queue Wrapper
File: queue-manager/common/queue-wrapper.js
Creates standardized Bull queues with:
- Redlock Integration: Distributed locking for preventing duplicate processing
- Error Handling: Automatic retry on stalled jobs
- Event Callbacks: Custom handlers for failed, completed, stalled events
- Concurrency Control: Configurable worker concurrency
- Level-Based Cleanup: Global vs account-level queue management
Key Features:
module.exports =
(name,
level, // 'global' or 'account'
{
processCb, // Main job processor
failedCb, // Failure handler
completedCb, // Completion handler
settings, // Bull settings
concurrency, // Worker concurrency
},
runCompletedCbAfterNoActives,
redlock); // Distributed locking config
🔧 Environment-Controlled Loading
Module Initialization
Queue Manager uses QM_* environment flags to control which modules load:
File: queue-manager/index.js
// Example module loading
if (process.env.QM_CONTACTS == 'true') {
const importContacts = require('./crons/contacts/import');
await importContacts.start();
logger.log({
initiator: 'QM/startup/QM_CONTACTS',
message: 'QM_CONTACTS started successfully',
});
}
Benefits
- Selective Execution: Run only required modules in dev/test environments
- Resource Optimization: Reduce memory footprint by loading only needed crons
- Testing Isolation: Test individual modules without starting entire system
- Deployment Flexibility: Different environments enable different feature sets
Single Initialization Protection
let cronsInitialized = false;
mongoConnection.on('open', async () => {
if (cronsInitialized) {
logger.log({
message: 'MongoDB reconnected, but crons already initialized.',
});
return;
}
await initServices();
cronsInitialized = true;
});
Prevents duplicate cron initialization on MongoDB reconnection.
🔐 Distributed Locking (Redlock)
Purpose
Prevents duplicate processing when multiple Queue Manager instances run:
- Race Condition Prevention: Only one worker processes a specific job
- Account-Level Locking: Lock by account ID to prevent concurrent updates
- Document-Level Locking: Lock by document ID for granular control
Implementation
// In queue-wrapper.js
if (redlock_init) {
await redlock_init.acquire(`locks:${redlock.type}:${job.data[redlock.id].toString()}`, 1000);
}
Lock Key Patterns:
locks:account:12345- Account-level locklocks:document:abc123- Document-level locklocks:global:resource-name- Global resource lock
🚨 Error Handling Strategy
Queue-Level Error Handling
Stalled Jobs:
Q.on('stalled', job => {
logger.error({ message: 'Job stalled', additional_data: { job_data: job } });
job.retry(); // Automatic retry
});
Failed Jobs:
Q.on('failed', async (job, err) => {
logger.error({ raw_error: err });
failedCb && (await failedCb(job, err)); // Custom failure handler
});
Service-Level Error Handling
Services use try-catch blocks with detailed logging:
try {
// Process logic
} catch (err) {
logger.error({
initiator: 'QM/module/job',
error: err,
additional_data: { context },
});
}
Process-Level Error Handling
Unhandled Rejections:
process.on('unhandledRejection', (reason, promise) => {
logger.error({ error: 'Unhandled Rejection', promise, reason });
});
Uncaught Exceptions:
process.on('uncaughtException', err => {
logger.error({
initiator: 'QM',
message: `Uncaught Exception: ${err?.message}`,
});
});
🛑 Graceful Shutdown
SIGTERM Handler
Ensures clean shutdown when process receives termination signal:
process.on('SIGTERM', async () => {
console.log('SIGTERM signal received. Initiating graceful shutdown...');
server.close(async () => {
// Close all queues
if (global?.runningQueuesMap?.size) {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
logger.log({ message: 'All queues processed. Exiting...' });
process.exit(0);
});
});
Shutdown Steps:
- Stop accepting new HTTP requests
- Close all Bull queues gracefully
- Wait for active jobs to complete
- Close MongoDB connection
- Exit process
📊 Monitoring & Observability
Logging Strategy
Initialization Logging:
logger.log({
initiator: 'QM/startup/QM_CONTACTS',
message: 'QM_CONTACTS started successfully',
});
Job Execution Logging:
logger.log({
initiator: 'QM/contacts/import',
message: 'Execution Started for importContacts()',
});
Error Logging:
logger.error({
initiator: 'QM/module/job',
error: err,
raw_error: {
name: err?.name,
message: err?.message,
stack: err?.stack,
},
});
Queue Metrics
Access queue statistics for monitoring:
const queue = global.runningQueuesMap.get('queue-name');
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();
🔗 Express API Integration
When QM_HOOKS=true, Queue Manager exposes HTTP endpoints:
File: queue-manager/src/routes/
Provides manual triggers and status endpoints for:
- Manual job triggering
- Queue status inspection
- Job management (pause, resume, retry)
📁 Directory Structure
queue-manager/
├── index.js # Main entry - module loading
├── common/
│ ├── queue-wrapper.js # Bull queue factory with Redlock
│ ├── generate_queue.js # Redis queue creation
│ └── changeStream.js # MongoDB change streams
├── crons/ # Cron schedules
│ ├── [module]/
│ │ └── [job].js # Schedule definition
├── services/ # Business logic
│ ├── [module]/
│ │ └── [job].service.js # Core processing logic
├── queues/ # Bull queue definitions
│ ├── [module]/
│ │ └── [job].js # Queue processor
├── triggers/ # API triggers
│ └── [module]/
│ └── [action].js # HTTP endpoints
├── streams/ # Change stream handlers
│ └── [collection].stream.js # MongoDB watchers
└── src/ # Express API (QM_HOOKS=true)
├── routes/
├── controllers/
└── services/
🔗 Related Documentation
- Configuration Guide (documentation unavailable)
- Queue Wrapper
- Module Documentation
Last Updated: 2025-10-10
Status: Active Production System