Skip to main content

Queue Manager

The Queue Manager is a Node.js background processing service that executes scheduled jobs and processes asynchronous tasks. It uses environment variables to selectively load processing modules, ensuring only required functionality runs in each environment.

Service Path: queue-manager/
Default Port: 6002 (configurable via PORT environment variable)
Dependencies: MongoDB, Redis, Bull, ioredis, Redlock

Architecture Overview

The Queue Manager uses environment-controlled module loading where each processing module is enabled via specific environment variables (e.g., QM_AFFILIATES=true). This allows fine-grained control over which jobs run in different environments.

Core Components

  1. Main Entry Point (index.js): Conditional module loading based on environment flags
  2. Common Utilities (common/): Shared queue wrapper, data mapping, and helper functions
  3. Processing Modules (crons/): Individual job processors for specific business functions
  4. Express API (src/): Optional HTTP endpoints enabled via QM_HOOKS=true
  5. Global Queue Registry: All active queues stored in global.runningQueuesMap

Service Structure

graph TB
subgraph "Entry Point"
INDEX[index.js<br/>Environment-based module loading]
end

subgraph "Common Utilities"
WRAPPER[queue-wrapper.js<br/>Bull queue creation]
GENERATE[generate_queue.js<br/>Queue factory]
end

subgraph "Processing Modules"
CRONS[crons/<br/>Individual job processors]
QUEUES[queues/<br/>Queue definitions]
SERVICES[services/<br/>Business logic]
end

subgraph "Optional Components"
API[src/<br/>Express routes<br/>if QM_HOOKS=true]
TRIGGERS[triggers/<br/>DB triggers]
end

INDEX --> WRAPPER
INDEX --> CRONS
WRAPPER --> GENERATE
CRONS --> SERVICES
SERVICES --> QUEUES
QUEUES --> WRAPPER
TRIGGERS --> CRONS

Sub-Modules

1. Entry Point Module

index.js - Main Service Controller

Purpose: Environment-controlled initialization of processing modules.

Key Features:

  • Single Initialization Guard: cronsInitialized flag prevents duplicate module loading
  • Conditional Loading: Each module loaded only if corresponding environment variable is true
  • MongoDB Connection: Establishes database connection with maxPoolSize: 10
  • Express Server: Always runs on port 6002 (or PORT env var)
  • SIGTERM Handler: Graceful shutdown with queue cleanup

Startup Pattern:

const initServices = async () => {
if (cronsInitialized) return;
cronsInitialized = true;

// Load modules based on environment flags
if (process.env.QM_AFFILIATES == 'true') {
await require('./crons/affiliates/commissions').start();
await require('./crons/affiliates/leaderboard').start();
await require('./crons/affiliates/expirePayouts').start();
}
// ... more modules
};

Graceful Shutdown:

process.on('SIGTERM', async () => {
server.close(async () => {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
process.exit(0);
});
});

2. Common Utilities Module

queue-wrapper.js - Bull Queue Factory

Purpose: Standardized Bull queue creation with Redlock support and lifecycle management.

Function Signature:

module.exports =
(name, // Queue name
level, // 'global' | 'account'
{
processCb, // Job processing callback
failedCb, // Optional failure handler
completedCb, // Optional completion handler
settings, // Bull AdvancedSettings
concurrency, // Concurrent job count
},
runCompletedCbAfterNoActives, // Run callback after no active jobs
redlock); // Redlock configuration object

Global Queue Registry: All queues stored in global.runningQueuesMap Map for centralized management.

Redlock Configuration:

redlock: {
active: true, // Enable distributed locking
id: 'account_id', // Field from job.data for lock key
type: 'account' // Lock type identifier
}

generate_queue.js - Redis Queue Factory

Purpose: Create Bull queue instances with Redis connection and default job options.

Configuration:

defaultJobOptions: {
removeOnComplete: true, // Auto-cleanup completed jobs
removeOnFail: true // Auto-cleanup failed jobs
}
redis: {
retryStrategy: () => 2000, // 2-second reconnect delay
reconnectOnError: () => true // Always attempt reconnection
}

Other Common Utilities

  • changeStream.js: MongoDB change stream helpers
  • downgrade_logs.js: Subscription downgrade logging

3. Environment Configuration Module

Purpose: Environment variables control which processing modules are loaded at startup.

Core Environment Variables

# Database & Redis
MONGO_DB_URL=mongodb://localhost:27017/dashclicks
REDIS_HOST=localhost
REDIS_PORT=6379

# Service Configuration
PORT=6002 # HTTP server port (default: 6002)
NODE_ENV=production # Environment mode

# Optional Features
QM_HOOKS=true # Enable Express API endpoints in src/

Verified Module Flags

Based on the source code in index.js, these environment variables control module loading:

# Contact & Account Management
QM_AFFILIATES=true # Affiliate commissions, leaderboard, expire payouts
QM_CONTACTS=true # Contact import processing
QM_CONTACTS_EXPORT=true # Contact export processing
QM_CONTACTS_EXPORT_CLEANUP=true # Export file cleanup
QM_ACCOUNTS=true # Account import processing

# Subscription Management
QM_SUBSCRIPTION_DOWNGRADE=true # Subscription tier changes
QM_SUBSCRIPTION_CANCEL=true # Subscription cancellation
QM_SUBSCRIPTION_ACTIVATE=true # Subscription activation
QM_SUBSCRIPTION_PENDING_SUBACCOUNT_CHARGE=true # Subaccount billing
QM_SUBSCRIPTION_CHARGE_MAINACCOUNT=true # Main account billing

# Content & Site Management
QM_INSTAREPORT_BUILD=true # InstaReport generation
QM_INSTASITES_PURGE=true # InstaSite cleanup
QM_SITE_UPDATE_BUSINESS=true # Site business info updates

# CRM & Communication
QM_DEALS=true # Deal import processing
QM_SUPPORT_SNOOZE=true # Support conversation snoozing
QM_REPUTATION=true # Review platform sync

Module Loading Pattern: Each flag triggers loading of corresponding modules from the crons/ directory.

Single Initialization: The cronsInitialized flag ensures modules load only once per service instance.

4. Processing Modules

The crons/ directory contains individual processors for specific business functions. Each module exports a start() function that initializes its processing logic.

Example Module Categories

Affiliate Management (QM_AFFILIATES=true):

  • crons/affiliates/commissions.js - Commission calculations
  • crons/affiliates/leaderboard.js - Leaderboard updates
  • crons/affiliates/expirePayouts.js - Payout expiration handling

Contact Management (QM_CONTACTS=true):

  • crons/contacts/import.js - CSV contact import processing
  • crons/contacts/export.js - Contact export generation
  • crons/contacts/cleanup.js - Export file cleanup

Subscription Management:

  • crons/store/subscriptions/downgrade.js - Tier changes
  • crons/store/subscriptions/cancel.js - Cancellation processing
  • crons/store/subscriptions/activate.js - Activation processing
  • crons/store/invoices/pending_subaccount_charge.js - Subaccount billing
  • crons/store/charge/charge_mainaccount.js - Main account billing

Content Management:

  • crons/instareports/build.js - InstaReport generation
  • crons/instasites/purge.js - InstaSite cleanup
  • crons/sites/update_business.js - Site business updates

Module Pattern: Each module follows the same structure:

// Example: crons/contacts/import.js
exports.start = async () => {
// Initialize Bull queue or cron job
// Return queue instance for global registry
};

5. Express API Module

src/ - Optional HTTP Endpoints

Purpose: Provides REST API endpoints when QM_HOOKS=true environment variable is set.

The Express server always runs on port 6002 but additional API routes are mounted only when hooks are enabled.

Base Endpoints (always available):

app.get('/status', (req, res) => {
res.json({ status: 'ok' });
});

Extended Routes (when QM_HOOKS=true):

  • Routes defined in src/routes/ are mounted
  • Controllers in src/controllers/ handle request processing

Error Handling Middleware

Duplicate Key Error Transformation:

if (error.message.indexOf('E11000 duplicate key') !== -1) {
error.message = 'Duplicate document.';
error.additional_info = 'An item matching the provided details was found...';
}

Axios Error Normalization:

if (error.isAxiosError) {
errorCode = error.response?.status || 400;
errorMessage = error.response?.data?.message || errorMessage;
additional_info = { ...error.toJSON(), stack: null, config: null };
}

6. Operational Guidance

Error Handling

Global Error Handlers:

// Uncaught exceptions - logged but don't exit process
process.on('uncaughtException', err => {
logger.error({
raw_error: err,
initiator: 'QM',
message: `Uncaught Exception: ${err?.message}`,
});
});

// Unhandled rejections - logged with detailed context
process.on('unhandledRejection', (reason, promise) => {
logger.error({ error: 'Unhandled Rejection', promise, reason });
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});

Graceful Shutdown

SIGTERM Handler Implementation:

process.on('SIGTERM', async () => {
console.log('SIGTERM signal received. Initiating graceful shutdown...');

// Stop accepting new HTTP requests
server.close(async () => {
logger.log({ message: 'Server is no longer accepting new requests.' });

// Close all queues in global registry
if (global?.runningQueuesMap?.size) {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}

logger.log({ message: 'All queues have been processed. Exiting...' });
process.exit(0);
});
});

Service Configuration

Essential Environment Variables:

# Required
MONGO_DB_URL=mongodb://localhost:27017/dashclicks
REDIS_HOST=localhost
REDIS_PORT=6379
PORT=6002

# Module Control (set to 'true' to enable)
QM_AFFILIATES=true
QM_CONTACTS=true
QM_SUBSCRIPTION_DOWNGRADE=true
# ... other QM_* flags as needed

Development vs Production:

  • Development: Enable specific modules for testing
  • Production: Enable all required modules for full functionality
  • Staging: Subset of production modules for integration testing

Technical Implementation Details

Global Queue Registry

All active queues are stored in global.runningQueuesMap - a Map instance created in queue-wrapper.js:

if (!global.runningQueuesMap) {
global.runningQueuesMap = new Map();
}

This registry enables:

  • Centralized queue management across all modules
  • Graceful shutdown coordination
  • Queue status monitoring via Express endpoints

Database Connection

MongoDB connection established via shared utilities with specific configuration:

await connectMongo({
uri: mongoURL,
initiator: 'queue-manager',
options: {
maxPoolSize: 10,
},
overrideEvents: ['open', 'disconnected'],
});

Redis Configuration

Redis connection handled by Bull queues with automatic retry logic:

  • Retry Strategy: 2-second delay between reconnection attempts
  • Reconnect Policy: Always attempt reconnection on errors
  • Job Cleanup: Automatic removal of completed/failed jobs

Service Dependencies

The Queue Manager uses these shared modules (copied from /shared/):

  • models/: Mongoose schemas for database operations
  • utilities/: Helper functions including logger, database utilities, etc.

Important: Never edit models or utilities directly in the queue-manager directory - they are Git-ignored and overwritten during builds.

Startup Logging

Each enabled module logs successful initialization:

logger.log({
initiator: 'QM/startup/QM_AFFILIATES',
message: 'QM_AFFILIATES started successfully',
});

This provides clear visibility into which modules are active in each environment.

📚 Module Documentation

Core Infrastructure

  • 📘 Architecture (documentation unavailable) - Complete system architecture and processing patterns
  • 📗 Configuration (documentation unavailable) - Environment variables and setup
  • 📙 Queue Wrapper - Bull queue factory with Redlock

Processing Modules

Summary

The Queue Manager is a modular background processing service with:

  • Environment-controlled loading via QM_* flags
  • Bull queue-based job processing with Redis backend
  • Redlock distributed locking for multi-instance deployments
  • Global queue registry for centralized management
  • Express API for monitoring and manual job management
  • Graceful shutdown with proper queue cleanup
  • Comprehensive error handling and logging

Each module is self-contained and can be enabled/disabled independently, making it suitable for different deployment scenarios and environments.


Service Status: Active Production System
Total Modules: 40+ processing modules
Last Updated: 2025-10-10

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM