Skip to main content

🛠️ Common Utilities

📖 Overview

The queue-manager/common/ directory contains shared utilities and helper functions used across multiple Queue Manager modules. These foundational components provide consistent patterns for queue creation, data manipulation, logging, and integration.

Location: queue-manager/common/

Purpose: Centralized utilities to avoid code duplication and ensure consistent behavior across all processing modules.

📁 Directory Structure

common/
├── queue-wrapper.js # Bull queue factory with Redlock (PRIMARY)
├── generate_queue.js # Redis queue creation with Bull
├── billing.js # Billing completion and Stripe integration
├── billing_add_mapping.js # Billing data field mapping
├── changeStream.js # MongoDB change stream helper
├── add_data.js # Contact data insertion utilities
├── log_data.js # Operation logging utilities
└── downgrade_logs.js # Subscription downgrade logging

🔧 Core Utilities

Queue Management

Data Operations

Billing & Subscriptions

Infrastructure


📘 generate_queue.js

Overview

Factory function for creating Bull queue instances with standardized Redis configuration and job options.

File: queue-manager/common/generate_queue.js

Purpose

  • Creates Bull queues with consistent Redis connection settings
  • Applies default job options (auto-cleanup completed/failed jobs)
  • Handles Redis connection errors with retry logic

Configuration

// Default job options applied to all queues
const defaultJobOptions = {
removeOnComplete: true, // Auto-remove completed jobs
removeOnFail: true, // Auto-remove failed jobs
};

// Redis connection settings
const redis = {
retryStrategy: () => 2000, // 2-second retry delay
reconnectOnError: () => true, // Always reconnect on error
};

Usage

const Queue = require('../common/generate_queue');

// Create a queue
const myQueue = Queue('my-queue-name', null, customSettings);

// Add jobs
await myQueue.add({ data: 'value' });

Key Features

  • Auto-Cleanup: Removes completed and failed jobs automatically to prevent Redis memory bloat
  • Retry Logic: Automatic reconnection on Redis errors
  • Consistent Configuration: All queues use same Redis settings

📘 changeStream.js

Overview

Helper function for creating MongoDB change streams with standardized error handling.

File: queue-manager/common/changeStream.js

Purpose

Simplifies MongoDB change stream creation by providing a consistent interface for watching collection changes in real-time.

Function Signature

const startChangeStream = (model, matchConditions, handleChange) => {
// Implementation
};

Parameters

model (Mongoose Model)

  • The Mongoose model to watch for changes

matchConditions (Object)

  • MongoDB aggregation match conditions
  • Filters which changes trigger the handler

handleChange (Function)

  • Callback function to handle change events
  • Receives: { data, op, updatedFields }

Code Implementation

const startChangeStream = (model, matchConditions, handleChange) => {
const stream = model.watch([{ $match: matchConditions }], { fullDocument: 'updateLookup' });

stream.on('change', async data => {
try {
await handleChange({
data: data.fullDocument, // Full updated document
op: data.operationType, // insert, update, delete
updatedFields: data?.updateDescription?.updatedFields,
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
}
});
};

Usage Example

const { startChangeStream } = require('../common/changeStream');
const OneBalanceReload = require('../models/onebalance-reload-request');

// Watch for new reload requests
startChangeStream(
OneBalanceReload,
{
operationType: 'insert',
'fullDocument.status': 'pending',
},
async ({ data, op }) => {
// Process the new reload request
await processReload(data);
},
);

Key Features

  • Full Document Lookup: Always returns complete document after update
  • Error Handling: Catches and logs errors without crashing stream
  • Type Safety: Structured response object with operation type
  • Simple API: Clean abstraction over MongoDB watch API

Use Cases

  • OneBalance real-time processing (reload/charge)
  • Real-time notification triggers
  • Audit logging
  • Cache invalidation

📘 billing.js

Overview

Utilities for completing billing processes and notifying users via sockets.

File: queue-manager/common/billing.js

Purpose

Manages billing data fetch completion, user notifications, and queue cleanup after billing operations complete.

Key Function: completeProcess

const completeProcess = async token => {
const userId = token?.user_id;

// Check if billing data fetched enough times (6 iterations)
const stripe_key = await StripeKey.findOne({ account_id: token?.account_id }).lean().exec();

if (stripe_key.fetch_count >= 6) {
// Notify user via socket
await socketEmit('billing_data_fetched', [userId.toString()], {
message: 'Successfully fetched data, Please refresh the page',
type: 'success',
});

// Reset fetch count and mark as initialized
await StripeKey.updateOne(
{ account_id: token?.account_id },
{ $set: { fetch_count: 0, initialized: true } },
);

// Clean up queue entry
await Queue.deleteOne({ _id: token.id });
}
};

Workflow

  1. Check Fetch Count: Verifies billing data fetched 6 times (complete sync)
  2. Socket Notification: Sends real-time notification to user
  3. Reset State: Marks Stripe key as initialized
  4. Queue Cleanup: Removes completed job from queue

Integration Points

  • StripeKey Model: Tracks Stripe integration status
  • Socket Service: Real-time user notifications
  • Queue Model: Job queue management

Use Cases

  • Stripe account initialization
  • Billing data synchronization
  • Customer/subscription import completion

📘 add_data.js

Overview

Utilities for inserting and managing contact data during imports.

File: queue-manager/common/add_data.js

Purpose

Provides standardized functions for adding contacts (people and businesses) to the database during CSV import operations.

Key Functions

addPerson

  • Inserts individual contact records
  • Handles duplicate detection
  • Validates email and required fields
  • Links to account and owner

addBusiness

  • Inserts business/company records
  • Manages business-specific fields
  • Handles company hierarchy

addContact

  • Unified function for both person and business
  • Determines type and routes to appropriate handler

Usage Pattern

const { addPerson, addBusiness } = require('../common/add_data');

// Add person contact
await addPerson({
accountId,
ownerId,
email: 'contact@example.com',
firstName: 'John',
lastName: 'Doe',
// ... other fields
});

// Add business contact
await addBusiness({
accountId,
ownerId,
companyName: 'Acme Corp',
industry: 'Technology',
// ... other fields
});

Validation Rules

  • Email format validation
  • Required field checks
  • Duplicate detection by email + account
  • Field length limits
  • Custom field validation

Error Handling

  • Returns detailed error messages
  • Tracks failed inserts
  • Logs validation failures
  • Supports partial success (some rows succeed, some fail)

📘 log_data.js

Overview

Logging utilities for recording operation results and statistics.

File: queue-manager/common/log_data.js

Purpose

Provides consistent logging for import operations, tracking success/failure counts, and storing operation metadata.

Key Functions

logImportResult

  • Records import operation results
  • Tracks counts (total, success, failed)
  • Stores error details

logExportResult

  • Records export operation results
  • Stores file URLs and metadata

updateOperationStatus

  • Updates operation status in real-time
  • Tracks progress percentage

Usage Pattern

const { logImportResult } = require('../common/log_data');

await logImportResult({
queueId: importJob._id,
totalRows: 100,
successCount: 95,
failedCount: 5,
errors: [
/* error details */
],
duration: 5000, // milliseconds
});

Logged Information

  • Operation ID and type
  • Start and end timestamps
  • Row counts (total, success, failed)
  • Error messages and stack traces
  • Performance metrics (duration, rows/second)
  • User and account context

📘 downgrade_logs.js

Overview

Logging utilities specific to subscription downgrade operations.

File: queue-manager/common/downgrade_logs.js

Purpose

Records detailed logs for subscription tier changes, tracking what features were removed, billing adjustments, and user notifications.

Key Functions

logDowngrade

  • Records downgrade event
  • Tracks feature changes
  • Stores billing adjustments

logDowngradeNotification

  • Logs notification delivery
  • Tracks email/SMS sent

Logged Data

  • Previous and new subscription tiers
  • Features removed/added
  • Billing adjustments and prorations
  • Effective date
  • Reason for downgrade (user-initiated, payment failure, etc.)
  • Notification status

Use Cases

  • Audit trail for subscription changes
  • Compliance reporting
  • Customer support reference
  • Revenue analysis

📘 billing_add_mapping.js

Overview

Field mapping utilities for billing data import and synchronization.

File: queue-manager/common/billing_add_mapping.js

Purpose

Maps billing data fields between different systems (Stripe, internal database, CSV imports) ensuring consistent data structure.

Key Functions

mapStripeCustomer

  • Maps Stripe customer object to internal format

mapStripeSubscription

  • Maps Stripe subscription object to internal format

mapBillingContact

  • Maps billing contact fields

Mapping Examples

// Stripe to Internal
{
// Stripe field → Internal field
'id': 'stripe_customer_id',
'email': 'email',
'name': 'customer_name',
'metadata.account_id': 'account_id'
}

Use Cases

  • Stripe webhook data processing
  • Billing data imports
  • Customer synchronization
  • Subscription updates

🔗 Integration Patterns

Common Usage Patterns

Pattern 1: Queue Creation with Wrapper

const queueWrapper = require('./common/queue-wrapper');
const generateQueue = require('./common/generate_queue');

// Create queue with wrapper (recommended)
const myQueue = queueWrapper('my-queue', 'global', {
processCb: async (job, done) => {
// Process job
},
});

Pattern 2: Change Stream Processing

const { startChangeStream } = require('./common/changeStream');

startChangeStream(Model, { operationType: 'insert' }, async ({ data }) => {
// React to changes
});

Pattern 3: Data Import with Logging

const { addPerson } = require('./common/add_data');
const { logImportResult } = require('./common/log_data');

// Import contacts
const results = await processImport(csvData);

// Log results
await logImportResult({
...results,
queueId: job.id,
});

  • Queue Wrapper - Detailed queue wrapper documentation
  • Architecture (documentation unavailable) - System architecture patterns
  • Contacts Module - Major user of common utilities

Module Type: Shared Utilities
Status: Stable
Last Updated: 2025-10-10

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM