๐ญ Generate Queue Factory
๐ Overviewโ
The generate_queue.js utility is a factory function that creates Bull queue instances with standardized Redis configuration, connection retry strategies, and default job options. This is a foundational utility used across all Queue Manager modules.
Source File: queue-manager/common/generate_queue.js
๐ฏ Purposeโ
- Standardize Queue Creation: Consistent Bull queue configuration across all modules
- Connection Management: Handles Redis connection pooling and reconnection
- Default Behavior: Sets sensible defaults for job lifecycle (auto-cleanup)
- Retry Strategy: Automatic reconnection after connection failures
๐ Function Signatureโ
function generateQueue(name, connectionParams, settings)
Parametersโ
- name (
String) - Queue name (e.g.,'contacts-import-123','affiliates-commissions') - connectionParams (
Object, optional) - Override default Redis connectionhost- Redis host (defaults toREDIS_HOSTenv var)port- Redis port (defaults toREDIS_PORTenv var)
- settings (
Queue.AdvancedSettings, optional) - Bull advanced settings
Returnsโ
Queue.Queue- Configured Bull queue instance
๐ง Implementation Detailsโ
Complete Source Codeโ
const Queue = require('bull');
const REDIS_HOST = process.env.REDIS_HOST;
const REDIS_PORT = process.env.REDIS_PORT;
function generateQueue(name, connectionParams, settings) {
const config = `redis://${connectionParams?.host || REDIS_HOST}:${
connectionParams?.port || REDIS_PORT
}`;
const queue = new Queue(name, config, {
settings,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
redis: {
retryStrategy: function () {
return 2000; // reconnect after 2 seconds
},
reconnectOnError: function () {
return true;
},
},
});
return queue;
}
module.exports = generateQueue;
Configuration Breakdownโ
1. Redis Connection Stringโ
const config = `redis://${connectionParams?.host || REDIS_HOST}:${
connectionParams?.port || REDIS_PORT
}`;
Logic:
- Uses
connectionParamsif provided, otherwise falls back to environment variables - Constructs standard Redis connection string:
redis://host:port - Supports runtime override for multi-Redis scenarios
2. Default Job Optionsโ
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true
}
Behavior:
removeOnComplete: true: Automatically deletes job from Redis after successful completionremoveOnFail: true: Automatically deletes job from Redis after all retry attempts fail- Memory Management: Prevents Redis from filling up with completed/failed jobs
- Override: Individual job additions can override these defaults
3. Retry Strategyโ
retryStrategy: function () {
return 2000; // reconnect after 2 seconds
}
Logic:
- Called when Redis connection is lost
- Returns delay (in milliseconds) before reconnection attempt
- Fixed 2-second delay (not exponential backoff)
- Retries indefinitely until connection restored
4. Reconnect on Errorโ
reconnectOnError: function () {
return true;
}
Logic:
- Called when Redis connection encounters an error
- Always returns
trueto attempt reconnection - Ensures queue resilience during Redis failures
๐จ Usage Patternsโ
Basic Usage (Default Redis)โ
const generateQueue = require('./common/generate_queue');
// Create queue with environment variables
const myQueue = generateQueue('my-process-queue');
// Add job
await myQueue.add(
{
data: 'payload',
},
{
attempts: 3,
backoff: 5000,
},
);
// Process jobs
myQueue.process(async job => {
// Handle job
return 'success';
});
Custom Redis Connectionโ
// Use specific Redis instance
const customQueue = generateQueue('custom-queue', {
host: 'redis-replica.example.com',
port: 6380,
});
With Advanced Settingsโ
// Custom concurrency and rate limiting
const advancedQueue = generateQueue(
'rate-limited-queue',
null, // Use default connection
{
maxStalledCount: 3,
stalledInterval: 30000,
guardInterval: 5000,
retryProcessDelay: 5000,
},
);
Overriding Default Job Optionsโ
const queue = generateQueue('cleanup-queue');
// This job will NOT be auto-removed
await queue.add(
{
data: 'keep-this',
},
{
removeOnComplete: false, // Override default
removeOnFail: false, // Override default
},
);
๐ Real-World Examples from Queue Managerโ
Contacts Import Queueโ
// queue-manager/queues/contacts/person.js
const generateQueue = require('../../common/generate_queue');
const queue = () => {
return {
start: async uniqueName => {
const Q = await generateQueue(`${uniqueName}-queue-contacts-person`);
Q.process(async (job, done) => {
// Process contact import
const { crmContacts } = job.data;
// ... insert logic
done();
});
return Q;
},
};
};
module.exports = queue();
Affiliates Commissions Queueโ
// queue-manager/queues/affiliates/commissions.js
const generateQueue = require('../../common/generate_queue');
const queue = () => {
return {
start: async uniqueName => {
const Q = await generateQueue(`${uniqueName}-queue-affiliates-commissions`);
Q.process(10, async (job, done) => {
// 10 concurrent jobs
// Calculate commissions
done();
});
return Q;
},
};
};
๐ Queue Lifecycleโ
graph TD
A[Call generateQueue] --> B[Create Bull Queue Instance]
B --> C{Redis Connected?}
C -->|Yes| D[Queue Ready]
C -->|No| E[Retry after 2s]
E --> C
D --> F[Add Jobs]
F --> G[Process Jobs]
G --> H{Job Complete?}
H -->|Success| I[Auto-remove from Redis]
H -->|Failed| J[Auto-remove from Redis]
G --> K{Redis Error?}
K -->|Yes| L[Reconnect Strategy]
L --> C
โ๏ธ Configurationโ
Required Environment Variablesโ
# Redis Connection
REDIS_HOST=localhost
REDIS_PORT=6379
Optional Redis Configurationโ
For production environments, consider:
# Redis Authentication
REDIS_PASSWORD=secret
# Redis TLS
REDIS_TLS=true
# Redis Cluster
REDIS_CLUSTER_NODES=host1:6379,host2:6379,host3:6379
Note: Current implementation doesn't support auth/TLS/cluster. Would require modification:
const config = {
host: connectionParams?.host || REDIS_HOST,
port: connectionParams?.port || REDIS_PORT,
password: process.env.REDIS_PASSWORD,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
};
๐จ Error Handlingโ
Connection Failuresโ
queue.on('error', error => {
console.error('Queue error:', error);
// Retry strategy will automatically reconnect
});
queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
// Job automatically removed per defaultJobOptions
});
Graceful Shutdownโ
process.on('SIGTERM', async () => {
await queue.close();
process.exit(0);
});
๐ Performance Considerationsโ
Memory Managementโ
- Auto-cleanup:
removeOnCompleteandremoveOnFailprevent Redis memory bloat - Trade-off: Cannot inspect completed jobs unless overridden
- Recommendation: Use for high-volume, transient jobs
Connection Poolingโ
- Bull Behavior: Reuses Redis connections internally
- Multiple Queues: Each queue instance creates separate connection pool
- Optimization: Reuse queue instances across modules when possible
Retry Strategyโ
- Fixed Delay: 2-second fixed delay may cause thundering herd
- Improvement: Consider exponential backoff:
retryStrategy: function(times) {
return Math.min(times * 1000, 30000); // Max 30s
}
๐งช Testing Considerationsโ
Mock Queue for Testsโ
// tests/mocks/queue.mock.js
const mockQueue = {
add: jest.fn(),
process: jest.fn(),
close: jest.fn(),
};
jest.mock('./common/generate_queue', () => {
return jest.fn(() => mockQueue);
});
Integration Testโ
const generateQueue = require('./common/generate_queue');
test('Queue processes jobs successfully', async () => {
const queue = generateQueue('test-queue');
queue.process(async job => {
return job.data.value * 2;
});
const job = await queue.add({ value: 5 });
const result = await job.finished();
expect(result).toBe(10);
await queue.close();
});
๐ Related Documentationโ
- Queue Wrapper - Higher-level queue wrapper with Redlock
- Common Utilities Overview
- Bull Queue Documentation
๐ Notesโ
When to Use vs Queue Wrapperโ
- Use
generate_queue.js: Simple queue creation without distributed locking - Use
queue-wrapper.js: When you need Redlock for distributed locking across instances
Bull Versionโ
This code assumes Bull v3.x. Bull v4.x has breaking changes.
Complexity: Low
Business Impact: Critical - Foundation for all queue processing
Dependencies: Bull, Redis
Last Updated: 2025-10-10