Skip to main content

OneBalance Balance Reload Job

Overview

The Balance Reload job is a critical financial operation that automatically charges customer credit cards and adds credits to their OneBalance accounts when their balance falls below a configured threshold. This system prevents service interruptions by ensuring accounts maintain sufficient credits for continued usage of DashClicks services (emails, SMS, calls, InstaSites, InstaReports, etc.).

Key Features:

  • Real-time monitoring via MongoDB change streams
  • Automatic Stripe charges when balance drops below threshold
  • Account locking when balance reaches critical levels (≤500 credits)
  • Retry mechanism with exponential backoff over 5 days
  • Multi-tenant support for parent/child account relationships
  • Transaction safety with MongoDB transactions

Processing Method:

  • Change Stream Trigger: MongoDB watch stream on onebalance collection
  • Manual Trigger: Service can be called with specific account_id

Architecture

Three-Tier Structure

┌─────────────────────────────────────────────────────────────────┐
│ CRON SCHEDULER │
│ (Currently Disabled) │
│ │
│ - Schedule: Every 30 seconds (commented out) │
│ - Purpose: Periodic check for accounts needing reload │
│ - Status: DISABLED - relies on change streams instead │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ SERVICE LAYER │
│ services/onebalance/balance_reload.js │
│ │
│ Core Operations: │
│ 1. Query onebalance documents below threshold │
│ 2. Lock accounts with balance ≤500 credits │
│ 3. Set balance_reload_queue_in_progress flag │
│ 4. Add jobs to Bull queue with 5-day retry span │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ QUEUE PROCESSOR │
│ queues/onebalance/balance_reload.js │
│ │
│ Job Processing: │
│ 1. Verify balance_reload_queue_in_progress flag │
│ 2. Retrieve Stripe API key (main/parent account) │
│ 3. Create Stripe charge for reload amount │
│ 4. Update balance within MongoDB transaction │
│ 5. Create usage log entry (event: 'refill') │
│ 6. Emit socket notification to account owner │
│ │
│ Bull Queue: onebalance_balance_reload_queue (global) │
│ Retry Strategy: 5 attempts, exponential backoff (~1.9h delay) │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ CHANGE STREAM WATCHER │
│ streams/onebalance/balance_reload.js │
│ │
│ Real-time Monitoring: │
│ - Watches: onebalance collection for 'update' operations │
│ - Trigger: balance < reload.threshold │
│ - Filter: Only balance field updates │
│ - Deduplication: Set-based account tracking │
│ - Action: Calls service layer with account_id │
└─────────────────────────────────────────────────────────────────┘

Processing Flow

┌──────────────────────────────────────────────────────────────────┐
│ Balance Drop Detected │
│ │
│ Triggers: │
│ 1. Change Stream: balance field updated & balance < threshold │
│ 2. Manual Call: service(account_id) invoked directly │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Service Layer Query │
│ │
│ Onebalance.aggregate([ │
│ { │
│ $match: { │
│ account_id: ObjectId(account_id), │
│ $expr: { $lt: ['$balance', '$reload.threshold'] }, │
│ new_account: { $ne: true }, │
│ balance_reload_queue_in_progress: false, │
│ 'reload.disabled': { $ne: true } │
│ } │
│ } │
│ ]) │
│ │
│ Result: onebalances[] - accounts needing reload │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Account Locking & Flagging │
│ │
│ For all matched accounts: │
│ Onebalance.updateMany( │
│ { _id: { $in: ids } }, │
│ { balance_reload_queue_in_progress: true } │
│ ) │
│ │
│ For critical accounts (balance ≤ 500): │
│ Onebalance.updateMany( │
│ { _id: { $in: critical_ids } }, │
│ { │
│ balance_reload_queue_in_progress: true, │
│ lock: true // Prevents service usage │
│ } │
│ ) │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Add Jobs to Bull Queue │
│ │
│ For each onebalance document: │
│ queue.add(onebalance_data, { │
│ jobId: onebalance._id.toString(), │
│ attempts: 5, │
│ backoff: { │
│ type: 'exponential', │
│ delay: 6857142 // ~1.9 hours │
│ }, │
│ removeOnComplete: true │
│ }) │
│ │
│ Backoff Calculation: │
│ Attempt 1: Immediate │
│ Attempt 2: ~1.9 hours │
│ Attempt 3: ~3.8 hours │
│ Attempt 4: ~7.6 hours │
│ Attempt 5: ~15.2 hours │
│ Total span: ~28.5 hours (~1.2 days) │
│ ⚠️ Comment says "5 days" but actual is ~1.2 days │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Queue Processor Start │
│ │
│ 1. Verify Flag: │
│ processCheck = Onebalance.findOne({ _id: job.id }) │
│ if (processCheck.balance_reload_queue_in_progress == false) │
│ return done() // Exit if flag cleared (duplicate job) │
│ │
│ 2. Retrieve Account & Stripe Key: │
│ account = Account.findById(account_id) │
│ if (account.main) { │
│ stripe = Stripe(STRIPE_SECRET_KEY) // Use global key │
│ } else { │
│ stripe_keys = StripeKey.findOne({ account_id: parent }) │
│ stripe = Stripe(stripe_keys.token.access_token) │
│ } │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Stripe Charge Processing │
│ │
│ Within MongoDB Transaction: │
│ │
│ 1. Update OneBalance (Optimistic): │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { │
│ $set: { │
│ lock: false, │
│ balance_reload_queue_in_progress: false, │
│ retry: false, │
│ retries: [] │
│ }, │
│ $inc: { balance: reload.amount } │
│ }, │
│ { session } │
│ ) │
│ │
│ 2. Retrieve Stripe Customer: │
│ customer = stripe.customers.retrieve(stripe_customer) │
│ │
│ 3. Verify Default Payment Source: │
│ if (!customer.default_source) { │
│ throw Error('No default source') │
│ } │
│ │
│ 4. Create Stripe Charge: │
│ stripe.charges.create({ │
│ amount: reload.amount, │
│ currency: 'USD', │
│ source: customer.default_source, │
│ description: 'Charge for balance reload', │
│ customer: stripe_customer │
│ }) │
│ │
│ Transaction Commit: All updates committed together │
└────────────────────────┬─────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────────┐
│ Success Handling │
│ │
│ 1. Create Usage Log: │
│ OnebalanceLogs.save({ │
│ account_id: account_id, │
│ event: 'refill', │
│ cost: reload.amount, │
│ status: 'success', │
│ type: 'credit' │
│ }) │
│ │
│ 2. Emit Socket Notification: │
│ user = User.findOne({ account: account_id, is_owner: true }) │
│ socketEmit('onebalance_charge_updated', [user._id], data) │
│ │
│ 3. Complete Job: │
│ done() │
└──────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────┐
│ Failure Handling │
│ │
│ On Job Failure (done(err)): │
│ │
│ 1. Record Retry Attempt: │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { │
│ $push: { │
│ retries: { │
│ reason: err.message, │
│ time: new Date() │
│ } │
│ }, │
│ retry: true │
│ } │
│ ) │
│ │
│ 2. Final Failure (after 5 attempts): │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { balance_reload_queue_in_progress: false } │
│ ) │
│ │
│ ⚠️ lock remains true, account stays locked! │
│ ⚠️ No notification sent to user about failure │
└──────────────────────────────────────────────────────────────────┘

Configuration

Environment Variables

# Stripe Configuration
STRIPE_SECRET_KEY=sk_live_xxxxx # Global Stripe API key for main accounts

# MongoDB Configuration
MONGODB_URI=mongodb://localhost:27017/dashclicks

# Redis Configuration (for Bull Queue)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

# Queue Manager Module Control
QM_ONEBALANCE=true # Enable/disable OneBalance module

Service Constants

// services/onebalance/balance_reload.js
const NEXT_RETRY_IN_MINS = 10; // Minimum time between retry attempts (unused currently)
const GLOBAL_THRESHOLD = 500; // Critical balance level for account locking

Queue Configuration

// queues/onebalance/balance_reload.js
{
jobId: onebalance._id.toString(), // Unique job ID prevents duplicates
attempts: 5, // Maximum retry attempts
backoff: {
type: 'exponential',
delay: 6857142 // ~1.9 hours initial delay
},
removeOnComplete: true // Auto-cleanup successful jobs
}

OneBalance Schema Defaults

// shared/models/onebalance.js
{
reload: {
threshold: 1000, // Trigger reload when balance < threshold
amount: 1000, // Amount to charge and add (in credits)
currency: 'usd', // Currency code
disabled: false // Allow/prevent auto-reload
},
balance: 0, // Current credit balance
new_account: true, // Skip reload for new accounts
lock: false, // Account lock status
retry: false, // Retry flag
retries: [], // Array of retry attempts
balance_reload_queue_in_progress: false // Processing flag
}

Data Models

Input Data (Job Data)

The service passes entire onebalance documents to the queue:

{
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
balance: 450, // Current balance below threshold
reload: {
threshold: 1000,
amount: 1000,
currency: "usd",
disabled: false
},
new_account: false,
lock: false,
retry: false,
retries: [],
balance_reload_queue_in_progress: false
}

Database Collections

onebalance Collection

Primary collection managing account credit balances:

{
_id: ObjectId,
account_id: ObjectId, // Reference to accounts collection
balance: Number, // Current credit balance
new_account: Boolean, // Skip auto-reload if true
reload: {
threshold: Number, // Reload trigger point (default: 1000)
amount: Number, // Reload amount (default: 1000)
currency: String, // Currency code (default: 'usd')
disabled: Boolean // Disable auto-reload (default: false)
},
lock: Boolean, // Account locked when balance critical
retry: Boolean, // Retry flag for failed reloads
retries: [
{
time: Date, // Retry timestamp
reason: String // Failure reason
}
],
balance_reload_queue_in_progress: Boolean, // Job processing flag

// Rebilling configuration (per-service pricing)
rebill: {
email: {
multiplier: Number, // Cost multiplier
credits: Number // Base credits cost
},
sms: { multiplier: Number, credits: Number },
inbound_call: { multiplier: Number, credits: Number },
outbound_call: { multiplier: Number, credits: Number },
instasite: { multiplier: Number, credits: Number },
instareport: { multiplier: Number, credits: Number },
lighting_domain: { value: Number, credits: Number },
phone: { multiplier: Number, credits: Number },
listing: { value: Number, credits: Number },
seo_keyword: { multiplier: Number, credits: Number }
},

createdAt: Date,
updatedAt: Date
}

Key Indexes:

  • account_id (unique)
  • balance (for threshold queries)
  • balance_reload_queue_in_progress (for processing checks)

onebalance.usage_logs Collection

Transaction history for all balance changes:

{
_id: ObjectId,
account_id: ObjectId, // Main account
sub_account_id: ObjectId, // Child account (if applicable)
event: String, // Event type
// Event Types: "email", "sms", "inbound_call", "outbound_call",
// "instasite", "instareport", "seo_keyword", "refill",
// "lighting_domain", "phone", "listing",
// "a2p_registration", "a2p_registration_renewal"

type: String, // "credit" or "debit"
cost: Number, // Transaction amount
base_price: Number, // Original price before multipliers
credits: Number, // Credits used
sub_account_cost: Number, // Cost allocated to sub-account
status: String, // Transaction status
status_reason: String, // Failure reason (if any)
additional_info: Mixed, // Extra metadata

createdAt: Date,
updatedAt: Date
}

Indexes:

  • account_id, createdAt (for history queries)
  • event, createdAt (for event-specific reports)

accounts Collection

Account information with Stripe customer references:

{
_id: ObjectId,
name: String,
main: Boolean, // true = main account, false = child
parent_account: ObjectId, // Parent account ID (for child accounts)
stripe_customer: String, // Stripe customer ID (cus_xxxxx)
// ... other account fields
}

stripe_keys Collection

OAuth tokens for child account Stripe access:

{
_id: ObjectId,
account_id: ObjectId, // Parent account
token: {
access_token: String, // Stripe OAuth access token
token_type: String, // "bearer"
scope: String, // "read_write"
// ... other OAuth fields
},
createdAt: Date,
updatedAt: Date
}

API Reference

Service Function

module.exports = async account_id => {
/* ... */
};

Parameters:

  • account_id (ObjectId | String, required) - Account to process for balance reload

Returns:

  • Promise<void> - No return value, operates via side effects

Side Effects:

  1. Updates onebalance documents (flags, locks)
  2. Adds jobs to Bull queue
  3. Logs processing to console

Behavior:

  • Returns immediately if account_id is falsy
  • Queries onebalance documents matching criteria
  • Locks accounts with balance ≤ 500
  • Adds all matched accounts to queue
  • Handles errors gracefully with console logging

Usage Example:

const balanceReload = require('./services/onebalance/balance_reload');

// Process specific account
await balanceReload('507f1f77bcf86cd799439012');

// Called from change stream
Stream.on('change', async data => {
await balanceReload(data.fullDocument.account_id);
});

Queue Processor Callbacks

processCb(job, done)

Main job processing function.

Parameters:

  • job (Bull.Job) - Job object containing onebalance data
  • done (Function) - Callback to signal completion

Processing Steps:

  1. Verify balance_reload_queue_in_progress flag
  2. Retrieve account and Stripe credentials
  3. Update balance within transaction
  4. Create Stripe charge
  5. Call done() on success or done(err) on failure

Error Conditions:

  • balance_reload_queue_in_progress == false → Exit early (duplicate job)
  • No Stripe key for parent account → Throw error
  • No stripe_customer on account → Throw error
  • No default payment source → Throw error
  • Stripe API failure → Throw error

failedCb(job, err)

Handles job failures and retry logic.

Parameters:

  • job (Bull.Job) - Failed job object
  • err (Error) - Error that caused failure

Actions:

  1. Push retry entry to retries array
  2. Set retry: true
  3. If final attempt, set balance_reload_queue_in_progress: false

⚠️ Critical Issue:

  • Does NOT clear lock: true on final failure
  • Accounts remain locked indefinitely without manual intervention

completedCb(job)

Handles successful job completion.

Parameters:

  • job (Bull.Job) - Completed job object

Actions:

  1. Create usage log entry (event: 'refill')
  2. Find account owner user
  3. Emit socket notification to owner
  4. Job automatically removed (removeOnComplete: true)

Change Stream Watcher

exports.start = () => {
/* ... */
};

Watch Configuration:

Onebalance.watch(
[
{
$match: {
$expr: { $lt: ['$fullDocument.balance', '$fullDocument.reload.threshold'] },
operationType: 'update',
'updateDescription.updatedFields.balance': { $exists: 1 },
},
},
],
{
fullDocument: 'updateLookup',
},
);

Trigger Conditions:

  1. Operation type is update
  2. balance field was updated
  3. New balance is below threshold

Deduplication:

  • Uses Set to track currently processing accounts
  • Prevents duplicate jobs for same account
  • Clears from set after service call completes

Example Change Event:

{
operationType: 'update',
fullDocument: {
_id: ObjectId("..."),
account_id: ObjectId("..."),
balance: 450,
reload: { threshold: 1000, amount: 1000 }
},
updateDescription: {
updatedFields: { balance: 450 }
}
}

Error Handling

Common Error Scenarios

1. No Stripe Customer ID

// Error Message
"Customer stripe is not connected."

// Cause
account.stripe_customer is null or undefined

// Impact
- Job fails and retries
- Account remains locked (if balance ≤ 500)
- No credits added

// Resolution
1. User must connect Stripe account via dashboard
2. Ensure stripe_customer field populated
3. Job will succeed on next retry

2. No Default Payment Source

// Error Message
"Customer has not set the default source yet."

// Cause
Stripe customer exists but has no default payment method

// Impact
- Job fails and retries
- Account locked
- Transaction rolled back (balance not updated)

// Resolution
1. User must add payment method in Stripe
2. Set default payment source
3. Job will succeed on next retry

3. Missing Parent Stripe Key

// Error Message
"Parent account stripe key does not exist."

// Cause
Child account's parent has no StripeKey document

// Impact
- Job fails and retries
- Cannot process payment
- Account locked

// Resolution
1. Parent account must complete Stripe OAuth
2. StripeKey document created
3. Job will succeed on next retry

4. Stripe API Failures

// Possible Errors
- "Your card was declined."
- "Your card has insufficient funds."
- "Your card has expired."
- "An error occurred while processing your card."

// Cause
Various Stripe payment processing issues

// Impact
- Job fails and retries with exponential backoff
- Account locked
- Balance not updated (transaction rollback)
- Retry attempts tracked

// Resolution
Depends on specific error:
- Card declined: Update payment method
- Insufficient funds: Add funds to card
- Expired card: Update card information
- Processing error: Usually temporary, retry succeeds

5. Duplicate Job Detection

// Scenario
balance_reload_queue_in_progress == false when processor runs

// Cause
- Job completed but completion event fired duplicate
- Race condition between jobs
- Manual flag clearing

// Behavior
- Job exits immediately via done()
- No processing occurs
- No error thrown

// Impact
- Prevents duplicate Stripe charges
- Safety mechanism for idempotency

Error Recovery Strategies

Exponential Backoff

{
attempts: 5,
backoff: {
type: 'exponential',
delay: 6857142 // ~1.9 hours
}
}

// Retry Schedule:
// Attempt 1: Immediate
// Attempt 2: ~1.9 hours later
// Attempt 3: ~3.8 hours later
// Attempt 4: ~7.6 hours later
// Attempt 5: ~15.2 hours later
// Total: ~28.5 hours (~1.2 days)

Purpose:

  • Give users time to fix payment issues
  • Reduce load on Stripe API
  • Avoid rate limiting

⚠️ Documentation Discrepancy:

  • Code comment says "5 days span"
  • Actual calculation: ~1.2 days
  • Consider: Increase delay to ~33 million ms for true 5-day span

Transaction Rollback

// On Stripe API failure
await withTransaction(async session => {
// Balance updated first (optimistic)
await Onebalance.updateOne({ _id: job.id }, { $inc: { balance: amount } }, { session });

// Then Stripe charge attempted
await stripe.charges.create({ ... });

// If charge fails, transaction rolls back automatically
});

Benefits:

  • Atomic operations
  • No orphaned balance updates
  • Data consistency guaranteed

⚠️ Potential Issue:

  • Balance updated optimistically before charge
  • If transaction large, rollback may take time
  • Consider: Move balance update after successful charge

Retry Tracking

// Each failure recorded
{
retries: [
{ time: ISODate("2025-10-13T10:00:00Z"), reason: "Card declined" },
{ time: ISODate("2025-10-13T11:54:00Z"), reason: "Card declined" },
{ time: ISODate("2025-10-13T15:40:00Z"), reason: "Insufficient funds" }
],
retry: true
}

Usage:

  • Debugging payment issues
  • Support team investigation
  • Audit trail

Critical Bugs & Issues

🐛 Bug #1: Account Lock Not Cleared on Final Failure

Issue:

// failedCb only clears balance_reload_queue_in_progress
if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
}
// ❌ lock: true remains set!

Impact:

  • Accounts with balance ≤ 500 stay locked forever after 5 failed attempts
  • Users cannot use ANY services
  • Requires manual database intervention

Fix:

if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
update.lock = false; // ✅ Add this line
}

🐛 Bug #2: No User Notification on Final Failure

Issue:

  • completedCb sends socket notification on success
  • failedCb does NOT notify user on final failure
  • User unaware their account is locked

Impact:

  • Poor user experience
  • Confusion about service interruptions
  • Increased support tickets

Fix:

const failedCb = async (job, err) => {
const id = job.data._id;
const update = {
$push: { retries: { reason: err.message, time: new Date() } },
retry: true,
};

if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
update.lock = false;

// ✅ Add notification
const user = await User.findOne({
account: job.data.account_id,
is_owner: true,
});

await socketEmit('onebalance_reload_failed', [user._id.toString()], {
reason: err.message,
balance: job.data.balance,
reload_amount: job.data.reload.amount,
});

// ✅ Consider email notification too
await sendEmail({
to: user.email,
subject: 'Action Required: Balance Reload Failed',
body: `Your balance reload failed after 5 attempts. Please update your payment method.`,
});
}

await Onebalance.updateOne({ _id: id }, update);
};

⚠️ Issue #3: Commented Out Cron Scheduler

Current State:

// Entire cron schedule is commented out
// cron.schedule('*/30 * * * * *', async () => { ... });

Implications:

  • Relies 100% on change stream triggers
  • No periodic safety check
  • If change stream fails/disconnects, no reloads processed

Recommendation:

  • Keep change streams for real-time processing
  • Enable cron as backup safety net (e.g., every 5 minutes)
  • Cron catches edge cases change streams might miss

⚠️ Issue #4: Optimistic Balance Update

Current Flow:

await withTransaction(async session => {
// 1. Update balance FIRST (optimistic)
await Onebalance.updateOne({ $inc: { balance: amount } }, { session });

// 2. THEN charge Stripe
await stripe.charges.create({ ... });
});

Risk:

  • If Stripe charge fails, transaction rolls back
  • But there's a window where balance appears updated
  • Could confuse concurrent operations

Better Approach:

await withTransaction(async session => {
// 1. Charge Stripe FIRST
const charge = await stripe.charges.create({ ... });

// 2. THEN update balance (only if charge succeeded)
if (charge.status === 'succeeded') {
await Onebalance.updateOne({ $inc: { balance: amount } }, { session });
}
});

⚠️ Issue #5: Inconsistent Logging

Current State:

  • Uses console.log instead of logger utility
  • No structured logging
  • Difficult to track in production

Fix:

// Replace all console.log with logger
logger.info({
initiator: 'QM/onebalance/balance-reload',
message: 'Processed docs for onebalance balance reload',
count: onebalances.length,
});

logger.error({
initiator: 'QM/onebalance/balance-reload',
error: err,
context: { account_id, balance: job.data.balance },
});

Socket Events

onebalance_charge_updated

Emitted on successful balance reload.

Event Name: onebalance_charge_updated

Recipients: Account owner only (is_owner: true)

Payload:

{
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
balance: 1450, // Updated balance (old + reload.amount)
reload: {
threshold: 1000,
amount: 1000,
currency: "usd",
disabled: false
},
lock: false,
retry: false,
retries: [],
balance_reload_queue_in_progress: false
// ... full onebalance document
}

Frontend Usage:

socket.on('onebalance_charge_updated', data => {
// Update UI with new balance
updateBalanceDisplay(data.balance);

// Show success notification
showNotification('Balance reloaded successfully!');

// Remove any lock indicators
if (!data.lock) {
removeLockBanner();
}
});

⚠️ Missing Event: Currently no socket event for failed reloads. Consider adding onebalance_reload_failed event.


Testing

Unit Tests

const balanceReload = require('./services/onebalance/balance_reload');
const Onebalance = require('./models/onebalance');
const Queue = require('./queues/onebalance/balance_reload');

describe('Balance Reload Service', () => {
test('should process account below threshold', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
new_account: false,
balance_reload_queue_in_progress: false,
});

// Act
await balanceReload(accountId);

// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(true);
});

test('should lock accounts with balance <= 500', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 300, // Below GLOBAL_THRESHOLD
reload: { threshold: 1000, amount: 1000 },
});

// Act
await balanceReload(accountId);

// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.lock).toBe(true);
});

test('should skip new accounts', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
new_account: true, // Should be skipped
});

// Act
await balanceReload(accountId);

// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(false);
});

test('should skip accounts with reload disabled', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000, disabled: true },
});

// Act
await balanceReload(accountId);

// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(false);
});
});

Integration Tests

const Stripe = require('stripe');
const Queue = require('./queues/onebalance/balance_reload');

describe('Balance Reload Queue Processor', () => {
test('should create Stripe charge and update balance', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
const account = await Account.create({
_id: accountId,
name: 'Test Account',
main: true,
stripe_customer: 'cus_test123',
});

const onebalance = await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
balance_reload_queue_in_progress: true,
});

// Mock Stripe
const stripeMock = {
customers: {
retrieve: jest.fn().mockResolvedValue({
id: 'cus_test123',
default_source: 'card_test456',
}),
},
charges: {
create: jest.fn().mockResolvedValue({
id: 'ch_test789',
status: 'succeeded',
amount: 1000,
}),
},
};
jest.spyOn(Stripe, 'Stripe').mockReturnValue(stripeMock);

// Act
const queue = await Queue.start();
const job = await queue.add(onebalance._doc, {
jobId: onebalance._id.toString(),
});
await job.finished();

// Assert
const updated = await Onebalance.findById(onebalance._id);
expect(updated.balance).toBe(1450); // 450 + 1000
expect(updated.lock).toBe(false);
expect(updated.balance_reload_queue_in_progress).toBe(false);

const log = await OnebalanceLogs.findOne({
account_id: accountId,
event: 'refill',
});
expect(log).toBeDefined();
expect(log.cost).toBe(1000);
expect(log.status).toBe('success');
});

test('should handle Stripe charge failure', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
const account = await Account.create({
_id: accountId,
stripe_customer: 'cus_test123',
});

const onebalance = await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
balance_reload_queue_in_progress: true,
});

// Mock Stripe failure
const stripeMock = {
customers: {
retrieve: jest.fn().mockResolvedValue({
default_source: 'card_test456',
}),
},
charges: {
create: jest.fn().mockRejectedValue(new Error('Your card was declined.')),
},
};
jest.spyOn(Stripe, 'Stripe').mockReturnValue(stripeMock);

// Act
const queue = await Queue.start();
const job = await queue.add(onebalance._doc, {
jobId: onebalance._id.toString(),
attempts: 1, // Single attempt for test
});

try {
await job.finished();
} catch (err) {
// Expected to fail
}

// Assert
const updated = await Onebalance.findById(onebalance._id);
expect(updated.balance).toBe(450); // Unchanged
expect(updated.retry).toBe(true);
expect(updated.retries.length).toBe(1);
expect(updated.retries[0].reason).toBe('Your card was declined.');
});
});

Change Stream Tests

const { start } = require('./streams/onebalance/balance_reload');
const balanceReloadService = require('./services/onebalance/balance_reload');

describe('Balance Reload Change Stream', () => {
test('should trigger on balance update below threshold', async () => {
// Arrange
const serviceSpy = jest.spyOn(balanceReloadService, 'default');
start(); // Start watching

const accountId = new mongoose.Types.ObjectId();
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 1500,
reload: { threshold: 1000 },
});

// Act
await Onebalance.updateOne(
{ _id: onebalance._id },
{ balance: 900 }, // Drop below threshold
);

// Wait for change stream
await new Promise(resolve => setTimeout(resolve, 1000));

// Assert
expect(serviceSpy).toHaveBeenCalledWith(accountId);
});

test('should prevent duplicate processing', async () => {
// Arrange
const serviceSpy = jest.spyOn(balanceReloadService, 'default');
start();

const accountId = new mongoose.Types.ObjectId();
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 1500,
reload: { threshold: 1000 },
});

// Act - Multiple rapid updates
await Promise.all([
Onebalance.updateOne({ _id: onebalance._id }, { balance: 900 }),
Onebalance.updateOne({ _id: onebalance._id }, { balance: 850 }),
Onebalance.updateOne({ _id: onebalance._id }, { balance: 800 }),
]);

await new Promise(resolve => setTimeout(resolve, 1000));

// Assert - Should only call once due to Set-based deduplication
expect(serviceSpy).toHaveBeenCalledTimes(1);
});
});

Manual Testing Procedures

Test Case 1: Successful Balance Reload

  1. Setup:

    // Create test account with Stripe customer
    const account = await Account.create({
    name: 'Test Account',
    main: true,
    stripe_customer: 'cus_xxxxx', // Real Stripe test customer
    });

    // Create onebalance below threshold
    const onebalance = await Onebalance.create({
    account_id: account._id,
    balance: 450,
    reload: { threshold: 1000, amount: 1000 },
    });
  2. Trigger:

    // Update balance to trigger change stream
    await Onebalance.updateOne({ _id: onebalance._id }, { balance: 400 });
  3. Verify:

    • Check Bull queue dashboard for job
    • Verify Stripe charge created
    • Check balance updated to 1400
    • Confirm usage log entry created
    • Verify socket notification sent
  4. Expected Result:

    {
    balance: 1400, // 400 + 1000
    lock: false,
    balance_reload_queue_in_progress: false,
    retry: false,
    retries: []
    }

Test Case 2: Account Locking at Critical Balance

  1. Setup:

    const onebalance = await Onebalance.create({
    account_id: account._id,
    balance: 300, // Below GLOBAL_THRESHOLD (500)
    reload: { threshold: 1000, amount: 1000 },
    });
  2. Trigger:

    await balanceReload(account._id);
  3. Verify:

    • Check lock: true set immediately
    • Verify account services blocked
    • Confirm job added to queue
    • After success, verify lock: false

Test Case 3: Payment Failure Recovery

  1. Setup:

    // Use Stripe test card that declines
    const account = await Account.create({
    stripe_customer: 'cus_test_declined',
    });
  2. Trigger:

    await balanceReload(account._id);
  3. Verify:

    • Job fails with "Card declined" error
    • Check retry entry added
    • Verify job retries after ~1.9 hours
    • After 5 failures, check final state
    • ⚠️ Confirm lock status (should be false, currently bug)
  4. Expected Final State:

    {
    balance: 300, // Unchanged
    balance_reload_queue_in_progress: false,
    retry: true,
    retries: [
    { time: Date, reason: "Card declined" },
    // ... 5 entries total
    ],
    lock: false // ⚠️ Currently true (bug)
    }

Monitoring & Observability

Key Metrics to Track

  1. Success Rate

    // Query usage logs
    const successRate = await OnebalanceLogs.aggregate([
    { $match: { event: 'refill', createdAt: { $gte: last24Hours } } },
    {
    $group: {
    _id: '$status',
    count: { $sum: 1 },
    },
    },
    ]);

    // Calculate: success / (success + failed) * 100
  2. Average Reload Amount

    await OnebalanceLogs.aggregate([
    { $match: { event: 'refill', status: 'success' } },
    {
    $group: {
    _id: null,
    avgCost: { $avg: '$cost' },
    totalRevenue: { $sum: '$cost' },
    },
    },
    ]);
  3. Locked Accounts

    const lockedCount = await Onebalance.countDocuments({ lock: true });

    // Alert if count exceeds threshold
    if (lockedCount > 100) {
    sendAlert('High number of locked accounts', lockedCount);
    }
  4. Retry Rates

    const withRetries = await Onebalance.countDocuments({
    'retries.0': { $exists: true },
    });

    const total = await Onebalance.countDocuments();
    const retryRate = (withRetries / total) * 100;
  5. Queue Depth

    const queue = await Queue.start();
    const waiting = await queue.getWaitingCount();
    const active = await queue.getActiveCount();
    const failed = await queue.getFailedCount();

    // Alert on queue backup
    if (waiting > 1000) {
    sendAlert('Balance reload queue backup', { waiting, active, failed });
    }

Dashboard Queries

Accounts Needing Attention:

// Accounts with multiple retry attempts
await Onebalance.find({
$expr: { $gte: [{ $size: '$retries' }, 3] },
}).populate('account_id', 'name email');

// Accounts locked for extended period
await Onebalance.find({
lock: true,
updatedAt: { $lt: new Date(Date.now() - 24 * 60 * 60 * 1000) },
}).populate('account_id');

// Accounts with disabled reload but low balance
await Onebalance.find({
'reload.disabled': true,
balance: { $lt: 500 },
}).populate('account_id');

Revenue Analytics:

// Daily reload revenue
await OnebalanceLogs.aggregate([
{
$match: {
event: 'refill',
status: 'success',
createdAt: { $gte: startOfMonth },
},
},
{
$group: {
_id: { $dateToString: { format: '%Y-%m-%d', date: '$createdAt' } },
revenue: { $sum: '$cost' },
count: { $sum: 1 },
},
},
{ $sort: { _id: 1 } },
]);

Alerting Rules

  1. Critical: High Failure Rate

    // If >10% of reloads fail in last hour
    if (failureRate > 0.1) {
    sendPagerDuty('High balance reload failure rate', { failureRate });
    }
  2. Warning: Queue Backup

    // If queue depth exceeds normal by 5x
    if (waitingCount > averageQueueDepth * 5) {
    sendSlack('Balance reload queue backup detected');
    }
  3. Info: Large Reload Volume

    // Track unusual reload patterns
    if (hourlyReloadCount > historicalAverage * 2) {
    sendEmail('Unusual reload activity detected');
    }

Performance Considerations

Optimization Opportunities

  1. Bulk Operations

    // Current: Individual updateOne calls in loop
    await Promise.all(
    onebalances.map(ob =>
    Onebalance.updateOne({ _id: ob._id }, { ... })
    )
    );

    // Better: Single updateMany
    const ids = onebalances.map(ob => ob._id);
    await Onebalance.updateMany(
    { _id: { $in: ids } },
    { balance_reload_queue_in_progress: true }
    );
  2. Parallel Stripe Charges

    // Current: Sequential processing via queue
    // Consider: Batch Stripe API calls for same parent account

    const groupedByParent = _.groupBy(jobs, 'account.parent_account');
    await Promise.all(
    Object.entries(groupedByParent).map(async ([parent, jobs]) => {
    const stripe = await getStripeClient(parent);
    return Promise.all(
    jobs.map(job => stripe.charges.create({ ... }))
    );
    })
    );
  3. Index Optimization

    // Current query
    Onebalance.aggregate([
    {
    $match: {
    account_id: ObjectId,
    $expr: { $lt: ['$balance', '$reload.threshold'] },
    new_account: { $ne: true },
    balance_reload_queue_in_progress: false,
    'reload.disabled': { $ne: true },
    },
    },
    ]);

    // Recommended indexes:
    db.onebalance.createIndex({
    account_id: 1,
    balance: 1,
    'reload.threshold': 1,
    });
    db.onebalance.createIndex({
    balance_reload_queue_in_progress: 1,
    'reload.disabled': 1,
    });
  4. Change Stream Filtering

    // Current: Checks balance field update
    'updateDescription.updatedFields.balance': { $exists: 1 }

    // Enhancement: Add compound filter
    $and: [
    { 'updateDescription.updatedFields.balance': { $exists: 1 } },
    { 'fullDocument.reload.disabled': { $ne: true } },
    { 'fullDocument.new_account': { $ne: true } }
    ]

Scalability Concerns

  1. Redis Queue Capacity

    • Current: Single global queue for all accounts
    • Bottleneck: Sequential job processing
    • Solution: Partition queues by parent account or region
  2. Stripe Rate Limits

    • Current: No rate limit handling
    • Risk: 429 errors during high volume
    • Solution: Implement rate limiter with retry-after headers
  3. Database Connections

    • Current: Individual queries per account
    • Optimization: Connection pooling tuning
    • Monitor: Active connections vs pool size
  4. Change Stream Load

    • Current: Single stream for all accounts
    • Scale: Consider sharding by account ID ranges
    • Fallback: Enable cron scheduler as backup

Security Considerations

Data Protection

  1. Stripe API Keys

    // ✅ Stored in environment variables
    const STRIPE_KEY = process.env.STRIPE_SECRET_KEY;

    // ✅ OAuth tokens encrypted in database
    const stripe_keys = await StripeKey.findOne({ account_id: parent });

    // ⚠️ Consider: Key rotation policy
    // ⚠️ Consider: Encryption at rest for StripeKey collection
  2. Transaction Security

    // ✅ Uses MongoDB transactions for atomicity
    await withTransaction(async session => {
    await Onebalance.updateOne({ ... }, { session });
    await stripe.charges.create({ ... });
    });

    // ✅ Prevents duplicate charges via jobId
    queue.add(data, { jobId: onebalance._id.toString() });
  3. Access Control

    // ⚠️ Consider: Verify account permissions
    // Before processing, ensure account is active and authorized

    const account = await Account.findById(account_id);
    if (account.status !== 'active' || account.suspended) {
    throw new Error('Account not authorized for reloads');
    }

Audit Trail

  1. Usage Logs

    // ✅ All reloads logged to onebalance.usage_logs
    await new OnebalanceLogs({
    account_id: job.data.account_id,
    event: 'refill',
    cost: job.data.reload.amount,
    status: 'success',
    type: 'credit',
    }).save();
  2. Retry History

    // ✅ Failed attempts tracked in retries array
    {
    retries: [
    { time: Date, reason: 'Card declined' },
    // ... full history preserved
    ];
    }
  3. Stripe Receipt IDs

    // ⚠️ Missing: Stripe charge ID not stored
    // Recommendation: Add to usage log

    const charge = await stripe.charges.create({ ... });
    await new OnebalanceLogs({
    // ... existing fields
    additional_info: {
    stripe_charge_id: charge.id,
    stripe_receipt_url: charge.receipt_url
    }
    }).save();

Compliance

  1. PCI DSS

    • ✅ No card data stored locally
    • ✅ Stripe handles all payment processing
    • ✅ Token-based authentication
  2. Data Retention

    // ⚠️ Consider: Usage log retention policy
    // Recommendation: Archive logs older than 7 years

    // Monthly job to archive old logs
    await OnebalanceLogs.updateMany(
    { createdAt: { $lt: sevenYearsAgo } },
    { $set: { archived: true } },
    );

Internal Dependencies

  • Balance Charge Module: onebalance/balance-charge.md - Counterpart for deducting credits
  • Billing Setup: billing/index-cron.md - Stripe integration initialization
  • A2P Campaign Renewal: communication/a2p/renewal.md - Uses OneBalance for campaign fees

External Resources

Code References

Service Layer:

  • queue-manager/services/onebalance/balance_reload.js - Main service logic
  • queue-manager/crons/onebalance/balance_reload.js - Cron scheduler (disabled)
  • queue-manager/streams/onebalance/balance_reload.js - Change stream watcher

Queue Processor:

  • queue-manager/queues/onebalance/balance_reload.js - Bull queue processor
  • queue-manager/common/queue-wrapper.js - Queue initialization wrapper

Models:

  • shared/models/onebalance.js - OneBalance schema
  • shared/models/onebalance-usage_logs.js - Usage logs schema
  • shared/models/account.js - Account schema
  • shared/models/stripe-key.js - Stripe OAuth tokens

Utilities:

  • shared/utilities/socket.js - Socket.IO emission helper
  • queue-manager/utils/mongo.js - Transaction wrapper

Revision History

DateVersionChangesAuthor
2025-10-131.0Initial documentationDocumentation Team

Quick Reference

Service Invocation

const balanceReload = require('./services/onebalance/balance_reload');

// Process specific account
await balanceReload(account_id);

Common Queries

// Find accounts needing reload
await Onebalance.find({
$expr: { $lt: ['$balance', '$reload.threshold'] },
'reload.disabled': { $ne: true },
});

// Check reload history
await OnebalanceLogs.find({
account_id: account_id,
event: 'refill',
})
.sort({ createdAt: -1 })
.limit(10);

// Monitor locked accounts
await Onebalance.find({ lock: true });

Troubleshooting Commands

# Check queue status
redis-cli LLEN "bull:onebalance_balance_reload_queue:wait"
redis-cli LLEN "bull:onebalance_balance_reload_queue:active"
redis-cli LLEN "bull:onebalance_balance_reload_queue:failed"

# Clear stuck jobs
redis-cli DEL "bull:onebalance_balance_reload_queue:wait"

# Restart change stream
pm2 restart queue-manager

Documentation Status: ✅ Complete
Last Updated: October 13, 2025
Module Status: 🟡 Active (Cron disabled, change streams enabled)
Priority: 🔴 HIGH (Critical financial operation)

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM