Skip to main content

๐Ÿญ Generate Queue Factory

๐Ÿ“– Overviewโ€‹

The generate_queue.js utility is a factory function that creates Bull queue instances with standardized Redis configuration, connection retry strategies, and default job options. This is a foundational utility used across all Queue Manager modules.

Source File: queue-manager/common/generate_queue.js

๐ŸŽฏ Purposeโ€‹

  • Standardize Queue Creation: Consistent Bull queue configuration across all modules
  • Connection Management: Handles Redis connection pooling and reconnection
  • Default Behavior: Sets sensible defaults for job lifecycle (auto-cleanup)
  • Retry Strategy: Automatic reconnection after connection failures

๐Ÿ“˜ Function Signatureโ€‹

function generateQueue(name, connectionParams, settings)

Parametersโ€‹

  • name (String) - Queue name (e.g., 'contacts-import-123', 'affiliates-commissions')
  • connectionParams (Object, optional) - Override default Redis connection
    • host - Redis host (defaults to REDIS_HOST env var)
    • port - Redis port (defaults to REDIS_PORT env var)
  • settings (Queue.AdvancedSettings, optional) - Bull advanced settings

Returnsโ€‹

  • Queue.Queue - Configured Bull queue instance

๐Ÿ”ง Implementation Detailsโ€‹

Complete Source Codeโ€‹

const Queue = require('bull');
const REDIS_HOST = process.env.REDIS_HOST;
const REDIS_PORT = process.env.REDIS_PORT;

function generateQueue(name, connectionParams, settings) {
const config = `redis://${connectionParams?.host || REDIS_HOST}:${
connectionParams?.port || REDIS_PORT
}`;

const queue = new Queue(name, config, {
settings,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
redis: {
retryStrategy: function () {
return 2000; // reconnect after 2 seconds
},
reconnectOnError: function () {
return true;
},
},
});

return queue;
}

module.exports = generateQueue;

Configuration Breakdownโ€‹

1. Redis Connection Stringโ€‹

const config = `redis://${connectionParams?.host || REDIS_HOST}:${
connectionParams?.port || REDIS_PORT
}`;

Logic:

  • Uses connectionParams if provided, otherwise falls back to environment variables
  • Constructs standard Redis connection string: redis://host:port
  • Supports runtime override for multi-Redis scenarios

2. Default Job Optionsโ€‹

defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true
}

Behavior:

  • removeOnComplete: true: Automatically deletes job from Redis after successful completion
  • removeOnFail: true: Automatically deletes job from Redis after all retry attempts fail
  • Memory Management: Prevents Redis from filling up with completed/failed jobs
  • Override: Individual job additions can override these defaults

3. Retry Strategyโ€‹

retryStrategy: function () {
return 2000; // reconnect after 2 seconds
}

Logic:

  • Called when Redis connection is lost
  • Returns delay (in milliseconds) before reconnection attempt
  • Fixed 2-second delay (not exponential backoff)
  • Retries indefinitely until connection restored

4. Reconnect on Errorโ€‹

reconnectOnError: function () {
return true;
}

Logic:

  • Called when Redis connection encounters an error
  • Always returns true to attempt reconnection
  • Ensures queue resilience during Redis failures

๐ŸŽจ Usage Patternsโ€‹

Basic Usage (Default Redis)โ€‹

const generateQueue = require('./common/generate_queue');

// Create queue with environment variables
const myQueue = generateQueue('my-process-queue');

// Add job
await myQueue.add(
{
data: 'payload',
},
{
attempts: 3,
backoff: 5000,
},
);

// Process jobs
myQueue.process(async job => {
// Handle job
return 'success';
});

Custom Redis Connectionโ€‹

// Use specific Redis instance
const customQueue = generateQueue('custom-queue', {
host: 'redis-replica.example.com',
port: 6380,
});

With Advanced Settingsโ€‹

// Custom concurrency and rate limiting
const advancedQueue = generateQueue(
'rate-limited-queue',
null, // Use default connection
{
maxStalledCount: 3,
stalledInterval: 30000,
guardInterval: 5000,
retryProcessDelay: 5000,
},
);

Overriding Default Job Optionsโ€‹

const queue = generateQueue('cleanup-queue');

// This job will NOT be auto-removed
await queue.add(
{
data: 'keep-this',
},
{
removeOnComplete: false, // Override default
removeOnFail: false, // Override default
},
);

๐Ÿ“Š Real-World Examples from Queue Managerโ€‹

Contacts Import Queueโ€‹

// queue-manager/queues/contacts/person.js
const generateQueue = require('../../common/generate_queue');

const queue = () => {
return {
start: async uniqueName => {
const Q = await generateQueue(`${uniqueName}-queue-contacts-person`);

Q.process(async (job, done) => {
// Process contact import
const { crmContacts } = job.data;
// ... insert logic
done();
});

return Q;
},
};
};

module.exports = queue();

Affiliates Commissions Queueโ€‹

// queue-manager/queues/affiliates/commissions.js
const generateQueue = require('../../common/generate_queue');

const queue = () => {
return {
start: async uniqueName => {
const Q = await generateQueue(`${uniqueName}-queue-affiliates-commissions`);

Q.process(10, async (job, done) => {
// 10 concurrent jobs
// Calculate commissions
done();
});

return Q;
},
};
};

๐Ÿ”„ Queue Lifecycleโ€‹

graph TD
A[Call generateQueue] --> B[Create Bull Queue Instance]
B --> C{Redis Connected?}
C -->|Yes| D[Queue Ready]
C -->|No| E[Retry after 2s]
E --> C
D --> F[Add Jobs]
F --> G[Process Jobs]
G --> H{Job Complete?}
H -->|Success| I[Auto-remove from Redis]
H -->|Failed| J[Auto-remove from Redis]
G --> K{Redis Error?}
K -->|Yes| L[Reconnect Strategy]
L --> C

โš™๏ธ Configurationโ€‹

Required Environment Variablesโ€‹

# Redis Connection
REDIS_HOST=localhost
REDIS_PORT=6379

Optional Redis Configurationโ€‹

For production environments, consider:

# Redis Authentication
REDIS_PASSWORD=secret

# Redis TLS
REDIS_TLS=true

# Redis Cluster
REDIS_CLUSTER_NODES=host1:6379,host2:6379,host3:6379

Note: Current implementation doesn't support auth/TLS/cluster. Would require modification:

const config = {
host: connectionParams?.host || REDIS_HOST,
port: connectionParams?.port || REDIS_PORT,
password: process.env.REDIS_PASSWORD,
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
};

๐Ÿšจ Error Handlingโ€‹

Connection Failuresโ€‹

queue.on('error', error => {
console.error('Queue error:', error);
// Retry strategy will automatically reconnect
});

queue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
// Job automatically removed per defaultJobOptions
});

Graceful Shutdownโ€‹

process.on('SIGTERM', async () => {
await queue.close();
process.exit(0);
});

๐Ÿ“ˆ Performance Considerationsโ€‹

Memory Managementโ€‹

  • Auto-cleanup: removeOnComplete and removeOnFail prevent Redis memory bloat
  • Trade-off: Cannot inspect completed jobs unless overridden
  • Recommendation: Use for high-volume, transient jobs

Connection Poolingโ€‹

  • Bull Behavior: Reuses Redis connections internally
  • Multiple Queues: Each queue instance creates separate connection pool
  • Optimization: Reuse queue instances across modules when possible

Retry Strategyโ€‹

  • Fixed Delay: 2-second fixed delay may cause thundering herd
  • Improvement: Consider exponential backoff:
retryStrategy: function(times) {
return Math.min(times * 1000, 30000); // Max 30s
}

๐Ÿงช Testing Considerationsโ€‹

Mock Queue for Testsโ€‹

// tests/mocks/queue.mock.js
const mockQueue = {
add: jest.fn(),
process: jest.fn(),
close: jest.fn(),
};

jest.mock('./common/generate_queue', () => {
return jest.fn(() => mockQueue);
});

Integration Testโ€‹

const generateQueue = require('./common/generate_queue');

test('Queue processes jobs successfully', async () => {
const queue = generateQueue('test-queue');

queue.process(async job => {
return job.data.value * 2;
});

const job = await queue.add({ value: 5 });
const result = await job.finished();

expect(result).toBe(10);
await queue.close();
});

๐Ÿ“ Notesโ€‹

When to Use vs Queue Wrapperโ€‹

  • Use generate_queue.js: Simple queue creation without distributed locking
  • Use queue-wrapper.js: When you need Redlock for distributed locking across instances

Bull Versionโ€‹

This code assumes Bull v3.x. Bull v4.x has breaking changes.


Complexity: Low
Business Impact: Critical - Foundation for all queue processing
Dependencies: Bull, Redis
Last Updated: 2025-10-10

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM