Skip to main content

🏗️ Queue Manager Architecture

📖 Overview

The Queue Manager is a sophisticated background job processing system built on Bull queues with Redis backing. It uses environment-controlled module loading, Redlock for distributed locking, and node-cron for scheduled task execution.

🔄 Core Processing Patterns

Pattern 1: Cron → Service → Queue (Most Common)

This is the standard pattern used by most Queue Manager jobs:

sequenceDiagram
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE_FILE as Queue Definition
participant BULL as Bull/Redis
participant DB as MongoDB

CRON->>CRON: Schedule triggers (e.g., */5 * * * * *)
CRON->>SERVICE: Execute service function
SERVICE->>DB: Query flagged documents
DB-->>SERVICE: Return matching records
SERVICE->>SERVICE: Process business logic
SERVICE->>QUEUE_FILE: Add jobs to queue
QUEUE_FILE->>BULL: Enqueue jobs
BULL->>QUEUE_FILE: Process job with worker
QUEUE_FILE->>DB: Update results
QUEUE_FILE-->>BULL: Job complete

Flow Steps:

  1. Cron Initialization: crons/[module]/[job].js schedules execution
  2. Service Processing: services/[module]/[job].service.js queries DB and applies logic
  3. Queue Addition: Service adds jobs to Bull queue via queues/[module]/[job].js
  4. Job Execution: Bull workers process jobs asynchronously
  5. Result Storage: Results written back to MongoDB

Pattern 2: Trigger → DB Flag → Cron → Service → Queue

Used when external triggers initiate processing:

sequenceDiagram
participant API as Internal API/Webhook
participant DB as MongoDB
participant TRIGGER as Trigger Endpoint
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE as Bull Queue

API->>TRIGGER: HTTP request
TRIGGER->>DB: Update flag<br/>(needsProcessing: true)
DB-->>TRIGGER: Flag updated
TRIGGER-->>API: 202 Accepted

loop Every N minutes
CRON->>SERVICE: Execute check
SERVICE->>DB: Query flagged documents
DB-->>SERVICE: Return flagged records
SERVICE->>SERVICE: Validate & process
SERVICE->>QUEUE: Add to Bull queue
QUEUE->>DB: Process & update results
end

Flow Steps:

  1. Trigger Receives Request: External endpoint receives webhook/API call
  2. DB Flag Update: Document flagged for processing (e.g., needsReportUpdate: true)
  3. Cron Periodic Check: Scheduled cron queries for flagged documents
  4. Service Processing: Business logic validates and processes records
  5. Queue Execution: Jobs added to Bull queue for async processing

Pattern 3: Internal API → Queue Collection → Queue Manager

Used for queue collections like InstaReportsQueue, Queue, YextQueue:

sequenceDiagram
participant API as Internal API
participant QUEUE_COLL as Queue Collection
participant CRON as Cron Scheduler
participant SERVICE as Service Logic
participant QUEUE_FILE as Bull Queue
participant DB as MongoDB

API->>QUEUE_COLL: Create job document<br/>(status: 'pending')
QUEUE_COLL-->>API: Job created

loop Every N seconds
CRON->>SERVICE: Execute check
SERVICE->>QUEUE_COLL: Query pending jobs
QUEUE_COLL-->>SERVICE: Return pending records
SERVICE->>SERVICE: Validate jobs
SERVICE->>QUEUE_FILE: Add to Bull queue
QUEUE_FILE->>DB: Process data
QUEUE_FILE->>QUEUE_COLL: Update status<br/>(status: 'completed')
end

Flow Steps:

  1. Internal API Creates Job: Controller adds document to queue collection
  2. Job Document Stored: MongoDB stores job with status: 'pending'
  3. Cron Queries Collection: Scheduled check finds pending jobs
  4. Service Processes: Business logic validates and adds to Bull queue
  5. Status Update: Job status updated to completed or failed

🗄️ Queue Registry System

Global Queue Map

All active queues are registered in global.runningQueuesMap for:

  • Queue Status Monitoring: Track active, waiting, delayed jobs
  • Manual Job Management: Add/remove jobs programmatically
  • Graceful Shutdown: Coordinate queue cleanup on SIGTERM
// Queue registration in queue-wrapper.js
global.runningQueuesMap.set(name, Q);

// Access queues globally
const queue = global.runningQueuesMap.get('contacts-import');
const activeCount = await queue.getActiveCount();

Queue Wrapper

File: queue-manager/common/queue-wrapper.js

Creates standardized Bull queues with:

  • Redlock Integration: Distributed locking for preventing duplicate processing
  • Error Handling: Automatic retry on stalled jobs
  • Event Callbacks: Custom handlers for failed, completed, stalled events
  • Concurrency Control: Configurable worker concurrency
  • Level-Based Cleanup: Global vs account-level queue management

Key Features:

module.exports =
(name,
level, // 'global' or 'account'
{
processCb, // Main job processor
failedCb, // Failure handler
completedCb, // Completion handler
settings, // Bull settings
concurrency, // Worker concurrency
},
runCompletedCbAfterNoActives,
redlock); // Distributed locking config

🔧 Environment-Controlled Loading

Module Initialization

Queue Manager uses QM_* environment flags to control which modules load:

File: queue-manager/index.js

// Example module loading
if (process.env.QM_CONTACTS == 'true') {
const importContacts = require('./crons/contacts/import');
await importContacts.start();
logger.log({
initiator: 'QM/startup/QM_CONTACTS',
message: 'QM_CONTACTS started successfully',
});
}

Benefits

  • Selective Execution: Run only required modules in dev/test environments
  • Resource Optimization: Reduce memory footprint by loading only needed crons
  • Testing Isolation: Test individual modules without starting entire system
  • Deployment Flexibility: Different environments enable different feature sets

Single Initialization Protection

let cronsInitialized = false;
mongoConnection.on('open', async () => {
if (cronsInitialized) {
logger.log({
message: 'MongoDB reconnected, but crons already initialized.',
});
return;
}
await initServices();
cronsInitialized = true;
});

Prevents duplicate cron initialization on MongoDB reconnection.

🔐 Distributed Locking (Redlock)

Purpose

Prevents duplicate processing when multiple Queue Manager instances run:

  • Race Condition Prevention: Only one worker processes a specific job
  • Account-Level Locking: Lock by account ID to prevent concurrent updates
  • Document-Level Locking: Lock by document ID for granular control

Implementation

// In queue-wrapper.js
if (redlock_init) {
await redlock_init.acquire(`locks:${redlock.type}:${job.data[redlock.id].toString()}`, 1000);
}

Lock Key Patterns:

  • locks:account:12345 - Account-level lock
  • locks:document:abc123 - Document-level lock
  • locks:global:resource-name - Global resource lock

🚨 Error Handling Strategy

Queue-Level Error Handling

Stalled Jobs:

Q.on('stalled', job => {
logger.error({ message: 'Job stalled', additional_data: { job_data: job } });
job.retry(); // Automatic retry
});

Failed Jobs:

Q.on('failed', async (job, err) => {
logger.error({ raw_error: err });
failedCb && (await failedCb(job, err)); // Custom failure handler
});

Service-Level Error Handling

Services use try-catch blocks with detailed logging:

try {
// Process logic
} catch (err) {
logger.error({
initiator: 'QM/module/job',
error: err,
additional_data: { context },
});
}

Process-Level Error Handling

Unhandled Rejections:

process.on('unhandledRejection', (reason, promise) => {
logger.error({ error: 'Unhandled Rejection', promise, reason });
});

Uncaught Exceptions:

process.on('uncaughtException', err => {
logger.error({
initiator: 'QM',
message: `Uncaught Exception: ${err?.message}`,
});
});

🛑 Graceful Shutdown

SIGTERM Handler

Ensures clean shutdown when process receives termination signal:

process.on('SIGTERM', async () => {
console.log('SIGTERM signal received. Initiating graceful shutdown...');

server.close(async () => {
// Close all queues
if (global?.runningQueuesMap?.size) {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}

logger.log({ message: 'All queues processed. Exiting...' });
process.exit(0);
});
});

Shutdown Steps:

  1. Stop accepting new HTTP requests
  2. Close all Bull queues gracefully
  3. Wait for active jobs to complete
  4. Close MongoDB connection
  5. Exit process

📊 Monitoring & Observability

Logging Strategy

Initialization Logging:

logger.log({
initiator: 'QM/startup/QM_CONTACTS',
message: 'QM_CONTACTS started successfully',
});

Job Execution Logging:

logger.log({
initiator: 'QM/contacts/import',
message: 'Execution Started for importContacts()',
});

Error Logging:

logger.error({
initiator: 'QM/module/job',
error: err,
raw_error: {
name: err?.name,
message: err?.message,
stack: err?.stack,
},
});

Queue Metrics

Access queue statistics for monitoring:

const queue = global.runningQueuesMap.get('queue-name');
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();

🔗 Express API Integration

When QM_HOOKS=true, Queue Manager exposes HTTP endpoints:

File: queue-manager/src/routes/

Provides manual triggers and status endpoints for:

  • Manual job triggering
  • Queue status inspection
  • Job management (pause, resume, retry)

📁 Directory Structure

queue-manager/
├── index.js # Main entry - module loading
├── common/
│ ├── queue-wrapper.js # Bull queue factory with Redlock
│ ├── generate_queue.js # Redis queue creation
│ └── changeStream.js # MongoDB change streams
├── crons/ # Cron schedules
│ ├── [module]/
│ │ └── [job].js # Schedule definition
├── services/ # Business logic
│ ├── [module]/
│ │ └── [job].service.js # Core processing logic
├── queues/ # Bull queue definitions
│ ├── [module]/
│ │ └── [job].js # Queue processor
├── triggers/ # API triggers
│ └── [module]/
│ └── [action].js # HTTP endpoints
├── streams/ # Change stream handlers
│ └── [collection].stream.js # MongoDB watchers
└── src/ # Express API (QM_HOOKS=true)
├── routes/
├── controllers/
└── services/

Last Updated: 2025-10-10
Status: Active Production System

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM