Queue Manager
The Queue Manager is a Node.js background processing service that executes scheduled jobs and processes asynchronous tasks. It uses environment variables to selectively load processing modules, ensuring only required functionality runs in each environment.
Service Path: queue-manager/
Default Port: 6002 (configurable via PORT environment variable)
Dependencies: MongoDB, Redis, Bull, ioredis, Redlock
Architecture Overview
The Queue Manager uses environment-controlled module loading where each processing module is enabled via specific environment variables (e.g., QM_AFFILIATES=true). This allows fine-grained control over which jobs run in different environments.
Core Components
- Main Entry Point (
index.js): Conditional module loading based on environment flags - Common Utilities (
common/): Shared queue wrapper, data mapping, and helper functions - Processing Modules (
crons/): Individual job processors for specific business functions - Express API (
src/): Optional HTTP endpoints enabled viaQM_HOOKS=true - Global Queue Registry: All active queues stored in
global.runningQueuesMap
Service Structure
graph TB
subgraph "Entry Point"
INDEX[index.js<br/>Environment-based module loading]
end
subgraph "Common Utilities"
WRAPPER[queue-wrapper.js<br/>Bull queue creation]
GENERATE[generate_queue.js<br/>Queue factory]
end
subgraph "Processing Modules"
CRONS[crons/<br/>Individual job processors]
QUEUES[queues/<br/>Queue definitions]
SERVICES[services/<br/>Business logic]
end
subgraph "Optional Components"
API[src/<br/>Express routes<br/>if QM_HOOKS=true]
TRIGGERS[triggers/<br/>DB triggers]
end
INDEX --> WRAPPER
INDEX --> CRONS
WRAPPER --> GENERATE
CRONS --> SERVICES
SERVICES --> QUEUES
QUEUES --> WRAPPER
TRIGGERS --> CRONS
Sub-Modules
1. Entry Point Module
index.js - Main Service Controller
Purpose: Environment-controlled initialization of processing modules.
Key Features:
- Single Initialization Guard:
cronsInitializedflag prevents duplicate module loading - Conditional Loading: Each module loaded only if corresponding environment variable is
true - MongoDB Connection: Establishes database connection with maxPoolSize: 10
- Express Server: Always runs on port 6002 (or
PORTenv var) - SIGTERM Handler: Graceful shutdown with queue cleanup
Startup Pattern:
const initServices = async () => {
if (cronsInitialized) return;
cronsInitialized = true;
// Load modules based on environment flags
if (process.env.QM_AFFILIATES == 'true') {
await require('./crons/affiliates/commissions').start();
await require('./crons/affiliates/leaderboard').start();
await require('./crons/affiliates/expirePayouts').start();
}
// ... more modules
};
Graceful Shutdown:
process.on('SIGTERM', async () => {
server.close(async () => {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
process.exit(0);
});
});
2. Common Utilities Module
queue-wrapper.js - Bull Queue Factory
Purpose: Standardized Bull queue creation with Redlock support and lifecycle management.
Function Signature:
module.exports =
(name, // Queue name
level, // 'global' | 'account'
{
processCb, // Job processing callback
failedCb, // Optional failure handler
completedCb, // Optional completion handler
settings, // Bull AdvancedSettings
concurrency, // Concurrent job count
},
runCompletedCbAfterNoActives, // Run callback after no active jobs
redlock); // Redlock configuration object
Global Queue Registry: All queues stored in global.runningQueuesMap Map for centralized management.
Redlock Configuration:
redlock: {
active: true, // Enable distributed locking
id: 'account_id', // Field from job.data for lock key
type: 'account' // Lock type identifier
}
generate_queue.js - Redis Queue Factory
Purpose: Create Bull queue instances with Redis connection and default job options.
Configuration:
defaultJobOptions: {
removeOnComplete: true, // Auto-cleanup completed jobs
removeOnFail: true // Auto-cleanup failed jobs
}
redis: {
retryStrategy: () => 2000, // 2-second reconnect delay
reconnectOnError: () => true // Always attempt reconnection
}
Other Common Utilities
- changeStream.js: MongoDB change stream helpers
- downgrade_logs.js: Subscription downgrade logging
3. Environment Configuration Module
Purpose: Environment variables control which processing modules are loaded at startup.
Core Environment Variables
# Database & Redis
MONGO_DB_URL=mongodb://localhost:27017/dashclicks
REDIS_HOST=localhost
REDIS_PORT=6379
# Service Configuration
PORT=6002 # HTTP server port (default: 6002)
NODE_ENV=production # Environment mode
# Optional Features
QM_HOOKS=true # Enable Express API endpoints in src/
Verified Module Flags
Based on the source code in index.js, these environment variables control module loading:
# Contact & Account Management
QM_AFFILIATES=true # Affiliate commissions, leaderboard, expire payouts
QM_CONTACTS=true # Contact import processing
QM_CONTACTS_EXPORT=true # Contact export processing
QM_CONTACTS_EXPORT_CLEANUP=true # Export file cleanup
QM_ACCOUNTS=true # Account import processing
# Subscription Management
QM_SUBSCRIPTION_DOWNGRADE=true # Subscription tier changes
QM_SUBSCRIPTION_CANCEL=true # Subscription cancellation
QM_SUBSCRIPTION_ACTIVATE=true # Subscription activation
QM_SUBSCRIPTION_PENDING_SUBACCOUNT_CHARGE=true # Subaccount billing
QM_SUBSCRIPTION_CHARGE_MAINACCOUNT=true # Main account billing
# Content & Site Management
QM_INSTAREPORT_BUILD=true # InstaReport generation
QM_INSTASITES_PURGE=true # InstaSite cleanup
QM_SITE_UPDATE_BUSINESS=true # Site business info updates
# CRM & Communication
QM_DEALS=true # Deal import processing
QM_SUPPORT_SNOOZE=true # Support conversation snoozing
QM_REPUTATION=true # Review platform sync
Module Loading Pattern: Each flag triggers loading of corresponding modules from the crons/ directory.
Single Initialization: The cronsInitialized flag ensures modules load only once per service instance.
4. Processing Modules
The crons/ directory contains individual processors for specific business functions. Each module exports a start() function that initializes its processing logic.
Example Module Categories
Affiliate Management (QM_AFFILIATES=true):
crons/affiliates/commissions.js- Commission calculationscrons/affiliates/leaderboard.js- Leaderboard updatescrons/affiliates/expirePayouts.js- Payout expiration handling
Contact Management (QM_CONTACTS=true):
crons/contacts/import.js- CSV contact import processingcrons/contacts/export.js- Contact export generationcrons/contacts/cleanup.js- Export file cleanup
Subscription Management:
crons/store/subscriptions/downgrade.js- Tier changescrons/store/subscriptions/cancel.js- Cancellation processingcrons/store/subscriptions/activate.js- Activation processingcrons/store/invoices/pending_subaccount_charge.js- Subaccount billingcrons/store/charge/charge_mainaccount.js- Main account billing
Content Management:
crons/instareports/build.js- InstaReport generationcrons/instasites/purge.js- InstaSite cleanupcrons/sites/update_business.js- Site business updates
Module Pattern: Each module follows the same structure:
// Example: crons/contacts/import.js
exports.start = async () => {
// Initialize Bull queue or cron job
// Return queue instance for global registry
};
5. Express API Module
src/ - Optional HTTP Endpoints
Purpose: Provides REST API endpoints when QM_HOOKS=true environment variable is set.
The Express server always runs on port 6002 but additional API routes are mounted only when hooks are enabled.
Base Endpoints (always available):
app.get('/status', (req, res) => {
res.json({ status: 'ok' });
});
Extended Routes (when QM_HOOKS=true):
- Routes defined in
src/routes/are mounted - Controllers in
src/controllers/handle request processing
Error Handling Middleware
Duplicate Key Error Transformation:
if (error.message.indexOf('E11000 duplicate key') !== -1) {
error.message = 'Duplicate document.';
error.additional_info = 'An item matching the provided details was found...';
}
Axios Error Normalization:
if (error.isAxiosError) {
errorCode = error.response?.status || 400;
errorMessage = error.response?.data?.message || errorMessage;
additional_info = { ...error.toJSON(), stack: null, config: null };
}
6. Operational Guidance
Error Handling
Global Error Handlers:
// Uncaught exceptions - logged but don't exit process
process.on('uncaughtException', err => {
logger.error({
raw_error: err,
initiator: 'QM',
message: `Uncaught Exception: ${err?.message}`,
});
});
// Unhandled rejections - logged with detailed context
process.on('unhandledRejection', (reason, promise) => {
logger.error({ error: 'Unhandled Rejection', promise, reason });
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});
Graceful Shutdown
SIGTERM Handler Implementation:
process.on('SIGTERM', async () => {
console.log('SIGTERM signal received. Initiating graceful shutdown...');
// Stop accepting new HTTP requests
server.close(async () => {
logger.log({ message: 'Server is no longer accepting new requests.' });
// Close all queues in global registry
if (global?.runningQueuesMap?.size) {
while (global.runningQueuesMap.size > 0) {
for (const [name, queue] of global.runningQueuesMap) {
await queue.close();
global.runningQueuesMap.delete(name);
logger.log({ message: `Queue ${name} closed successfully.` });
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
logger.log({ message: 'All queues have been processed. Exiting...' });
process.exit(0);
});
});
Service Configuration
Essential Environment Variables:
# Required
MONGO_DB_URL=mongodb://localhost:27017/dashclicks
REDIS_HOST=localhost
REDIS_PORT=6379
PORT=6002
# Module Control (set to 'true' to enable)
QM_AFFILIATES=true
QM_CONTACTS=true
QM_SUBSCRIPTION_DOWNGRADE=true
# ... other QM_* flags as needed
Development vs Production:
- Development: Enable specific modules for testing
- Production: Enable all required modules for full functionality
- Staging: Subset of production modules for integration testing
Technical Implementation Details
Global Queue Registry
All active queues are stored in global.runningQueuesMap - a Map instance created in queue-wrapper.js:
if (!global.runningQueuesMap) {
global.runningQueuesMap = new Map();
}
This registry enables:
- Centralized queue management across all modules
- Graceful shutdown coordination
- Queue status monitoring via Express endpoints
Database Connection
MongoDB connection established via shared utilities with specific configuration:
await connectMongo({
uri: mongoURL,
initiator: 'queue-manager',
options: {
maxPoolSize: 10,
},
overrideEvents: ['open', 'disconnected'],
});
Redis Configuration
Redis connection handled by Bull queues with automatic retry logic:
- Retry Strategy: 2-second delay between reconnection attempts
- Reconnect Policy: Always attempt reconnection on errors
- Job Cleanup: Automatic removal of completed/failed jobs
Service Dependencies
The Queue Manager uses these shared modules (copied from /shared/):
- models/: Mongoose schemas for database operations
- utilities/: Helper functions including logger, database utilities, etc.
Important: Never edit models or utilities directly in the queue-manager directory - they are Git-ignored and overwritten during builds.
Startup Logging
Each enabled module logs successful initialization:
logger.log({
initiator: 'QM/startup/QM_AFFILIATES',
message: 'QM_AFFILIATES started successfully',
});
This provides clear visibility into which modules are active in each environment.
📚 Module Documentation
Core Infrastructure
- 📘 Architecture (documentation unavailable) - Complete system architecture and processing patterns
- 📗 Configuration (documentation unavailable) - Environment variables and setup
- 📙 Queue Wrapper - Bull queue factory with Redlock
Processing Modules
- 🤝 Affiliates - Commission calculations, leaderboard, payouts
- 📇 Contacts - Import, export, and cleanup processing
- 💳 Store & Subscriptions - Subscription lifecycle and billing
- 📊 InstaReports - Automated report generation
- 🌐 InstaSites - Site building and purging
- 💬 Conversations & Support - Support ticket management
- 🤝 Deals & CRM - Deal imports and automations
- 👥 Accounts - Account imports and monitoring
- ⭐ Reviews & Reputation - Review fetching from platforms
- 🌐 Sites - Thumbnail generation and business updates
- 🌐 Domains (Lightning) - Domain validation, renewal, cancellation
- 🎯 Funnels - Funnel cloning and backup cleanup
- 📱 Communication - Violation detection and A2P management
- 💰 Billing - Billing contact import and authentication
- 💳 OneBalance - Balance reload and charge processing
- 🔔 Webhooks - Webhook notification delivery
- 🔧 Miscellaneous - Projects, Intercom leads, schedule monitoring
Summary
The Queue Manager is a modular background processing service with:
- Environment-controlled loading via QM_* flags
- Bull queue-based job processing with Redis backend
- Redlock distributed locking for multi-instance deployments
- Global queue registry for centralized management
- Express API for monitoring and manual job management
- Graceful shutdown with proper queue cleanup
- Comprehensive error handling and logging
Each module is self-contained and can be enabled/disabled independently, making it suitable for different deployment scenarios and environments.
Service Status: Active Production System
Total Modules: 40+ processing modules
Last Updated: 2025-10-10