🛠️ Common Utilities
📖 Overview
The queue-manager/common/ directory contains shared utilities and helper functions used across multiple Queue Manager modules. These foundational components provide consistent patterns for queue creation, data manipulation, logging, and integration.
Location: queue-manager/common/
Purpose: Centralized utilities to avoid code duplication and ensure consistent behavior across all processing modules.
📁 Directory Structure
common/
├── queue-wrapper.js # Bull queue factory with Redlock (PRIMARY)
├── generate_queue.js # Redis queue creation with Bull
├── billing.js # Billing completion and Stripe integration
├── billing_add_mapping.js # Billing data field mapping
├── changeStream.js # MongoDB change stream helper
├── add_data.js # Contact data insertion utilities
├── log_data.js # Operation logging utilities
└── downgrade_logs.js # Subscription downgrade logging
🔧 Core Utilities
Queue Management
- queue-wrapper.js - Primary queue factory (Already documented)
- generate_queue.js - Redis queue creation
Data Operations
- add_data.js - Contact insertion and validation
- log_data.js - Operation result logging
Billing & Subscriptions
- billing.js - Billing process completion
- billing_add_mapping.js - Field mapping for billing data
- downgrade_logs.js - Downgrade event logging
Infrastructure
- changeStream.js - MongoDB change stream wrapper
📘 generate_queue.js
Overview
Factory function for creating Bull queue instances with standardized Redis configuration and job options.
File: queue-manager/common/generate_queue.js
Purpose
- Creates Bull queues with consistent Redis connection settings
- Applies default job options (auto-cleanup completed/failed jobs)
- Handles Redis connection errors with retry logic
Configuration
// Default job options applied to all queues
const defaultJobOptions = {
removeOnComplete: true, // Auto-remove completed jobs
removeOnFail: true, // Auto-remove failed jobs
};
// Redis connection settings
const redis = {
retryStrategy: () => 2000, // 2-second retry delay
reconnectOnError: () => true, // Always reconnect on error
};
Usage
const Queue = require('../common/generate_queue');
// Create a queue
const myQueue = Queue('my-queue-name', null, customSettings);
// Add jobs
await myQueue.add({ data: 'value' });
Key Features
- Auto-Cleanup: Removes completed and failed jobs automatically to prevent Redis memory bloat
- Retry Logic: Automatic reconnection on Redis errors
- Consistent Configuration: All queues use same Redis settings
📘 changeStream.js
Overview
Helper function for creating MongoDB change streams with standardized error handling.
File: queue-manager/common/changeStream.js
Purpose
Simplifies MongoDB change stream creation by providing a consistent interface for watching collection changes in real-time.
Function Signature
const startChangeStream = (model, matchConditions, handleChange) => {
// Implementation
};
Parameters
model (Mongoose Model)
- The Mongoose model to watch for changes
matchConditions (Object)
- MongoDB aggregation match conditions
- Filters which changes trigger the handler
handleChange (Function)
- Callback function to handle change events
- Receives:
{ data, op, updatedFields }
Code Implementation
const startChangeStream = (model, matchConditions, handleChange) => {
const stream = model.watch([{ $match: matchConditions }], { fullDocument: 'updateLookup' });
stream.on('change', async data => {
try {
await handleChange({
data: data.fullDocument, // Full updated document
op: data.operationType, // insert, update, delete
updatedFields: data?.updateDescription?.updatedFields,
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
}
});
};
Usage Example
const { startChangeStream } = require('../common/changeStream');
const OneBalanceReload = require('../models/onebalance-reload-request');
// Watch for new reload requests
startChangeStream(
OneBalanceReload,
{
operationType: 'insert',
'fullDocument.status': 'pending',
},
async ({ data, op }) => {
// Process the new reload request
await processReload(data);
},
);
Key Features
- Full Document Lookup: Always returns complete document after update
- Error Handling: Catches and logs errors without crashing stream
- Type Safety: Structured response object with operation type
- Simple API: Clean abstraction over MongoDB watch API
Use Cases
- OneBalance real-time processing (reload/charge)
- Real-time notification triggers
- Audit logging
- Cache invalidation
📘 billing.js
Overview
Utilities for completing billing processes and notifying users via sockets.
File: queue-manager/common/billing.js
Purpose
Manages billing data fetch completion, user notifications, and queue cleanup after billing operations complete.
Key Function: completeProcess
const completeProcess = async token => {
const userId = token?.user_id;
// Check if billing data fetched enough times (6 iterations)
const stripe_key = await StripeKey.findOne({ account_id: token?.account_id }).lean().exec();
if (stripe_key.fetch_count >= 6) {
// Notify user via socket
await socketEmit('billing_data_fetched', [userId.toString()], {
message: 'Successfully fetched data, Please refresh the page',
type: 'success',
});
// Reset fetch count and mark as initialized
await StripeKey.updateOne(
{ account_id: token?.account_id },
{ $set: { fetch_count: 0, initialized: true } },
);
// Clean up queue entry
await Queue.deleteOne({ _id: token.id });
}
};
Workflow
- Check Fetch Count: Verifies billing data fetched 6 times (complete sync)
- Socket Notification: Sends real-time notification to user
- Reset State: Marks Stripe key as initialized
- Queue Cleanup: Removes completed job from queue
Integration Points
- StripeKey Model: Tracks Stripe integration status
- Socket Service: Real-time user notifications
- Queue Model: Job queue management
Use Cases
- Stripe account initialization
- Billing data synchronization
- Customer/subscription import completion
📘 add_data.js
Overview
Utilities for inserting and managing contact data during imports.
File: queue-manager/common/add_data.js
Purpose
Provides standardized functions for adding contacts (people and businesses) to the database during CSV import operations.
Key Functions
addPerson
- Inserts individual contact records
- Handles duplicate detection
- Validates email and required fields
- Links to account and owner
addBusiness
- Inserts business/company records
- Manages business-specific fields
- Handles company hierarchy
addContact
- Unified function for both person and business
- Determines type and routes to appropriate handler
Usage Pattern
const { addPerson, addBusiness } = require('../common/add_data');
// Add person contact
await addPerson({
accountId,
ownerId,
email: 'contact@example.com',
firstName: 'John',
lastName: 'Doe',
// ... other fields
});
// Add business contact
await addBusiness({
accountId,
ownerId,
companyName: 'Acme Corp',
industry: 'Technology',
// ... other fields
});
Validation Rules
- Email format validation
- Required field checks
- Duplicate detection by email + account
- Field length limits
- Custom field validation
Error Handling
- Returns detailed error messages
- Tracks failed inserts
- Logs validation failures
- Supports partial success (some rows succeed, some fail)
📘 log_data.js
Overview
Logging utilities for recording operation results and statistics.
File: queue-manager/common/log_data.js
Purpose
Provides consistent logging for import operations, tracking success/failure counts, and storing operation metadata.
Key Functions
logImportResult
- Records import operation results
- Tracks counts (total, success, failed)
- Stores error details
logExportResult
- Records export operation results
- Stores file URLs and metadata
updateOperationStatus
- Updates operation status in real-time
- Tracks progress percentage
Usage Pattern
const { logImportResult } = require('../common/log_data');
await logImportResult({
queueId: importJob._id,
totalRows: 100,
successCount: 95,
failedCount: 5,
errors: [
/* error details */
],
duration: 5000, // milliseconds
});
Logged Information
- Operation ID and type
- Start and end timestamps
- Row counts (total, success, failed)
- Error messages and stack traces
- Performance metrics (duration, rows/second)
- User and account context
📘 downgrade_logs.js
Overview
Logging utilities specific to subscription downgrade operations.
File: queue-manager/common/downgrade_logs.js
Purpose
Records detailed logs for subscription tier changes, tracking what features were removed, billing adjustments, and user notifications.
Key Functions
logDowngrade
- Records downgrade event
- Tracks feature changes
- Stores billing adjustments
logDowngradeNotification
- Logs notification delivery
- Tracks email/SMS sent
Logged Data
- Previous and new subscription tiers
- Features removed/added
- Billing adjustments and prorations
- Effective date
- Reason for downgrade (user-initiated, payment failure, etc.)
- Notification status
Use Cases
- Audit trail for subscription changes
- Compliance reporting
- Customer support reference
- Revenue analysis
📘 billing_add_mapping.js
Overview
Field mapping utilities for billing data import and synchronization.
File: queue-manager/common/billing_add_mapping.js
Purpose
Maps billing data fields between different systems (Stripe, internal database, CSV imports) ensuring consistent data structure.
Key Functions
mapStripeCustomer
- Maps Stripe customer object to internal format
mapStripeSubscription
- Maps Stripe subscription object to internal format
mapBillingContact
- Maps billing contact fields
Mapping Examples
// Stripe to Internal
{
// Stripe field → Internal field
'id': 'stripe_customer_id',
'email': 'email',
'name': 'customer_name',
'metadata.account_id': 'account_id'
}
Use Cases
- Stripe webhook data processing
- Billing data imports
- Customer synchronization
- Subscription updates
🔗 Integration Patterns
Common Usage Patterns
Pattern 1: Queue Creation with Wrapper
const queueWrapper = require('./common/queue-wrapper');
const generateQueue = require('./common/generate_queue');
// Create queue with wrapper (recommended)
const myQueue = queueWrapper('my-queue', 'global', {
processCb: async (job, done) => {
// Process job
},
});
Pattern 2: Change Stream Processing
const { startChangeStream } = require('./common/changeStream');
startChangeStream(Model, { operationType: 'insert' }, async ({ data }) => {
// React to changes
});
Pattern 3: Data Import with Logging
const { addPerson } = require('./common/add_data');
const { logImportResult } = require('./common/log_data');
// Import contacts
const results = await processImport(csvData);
// Log results
await logImportResult({
...results,
queueId: job.id,
});
🔗 Related Documentation
- Queue Wrapper - Detailed queue wrapper documentation
- Architecture (documentation unavailable) - System architecture patterns
- Contacts Module - Major user of common utilities
Module Type: Shared Utilities
Status: Stable
Last Updated: 2025-10-10