OneBalance Balance Reload Job
Overview
The Balance Reload job is a critical financial operation that automatically charges customer credit cards and adds credits to their OneBalance accounts when their balance falls below a configured threshold. This system prevents service interruptions by ensuring accounts maintain sufficient credits for continued usage of DashClicks services (emails, SMS, calls, InstaSites, InstaReports, etc.).
Key Features:
- Real-time monitoring via MongoDB change streams
- Automatic Stripe charges when balance drops below threshold
- Account locking when balance reaches critical levels (≤500 credits)
- Retry mechanism with exponential backoff over 5 days
- Multi-tenant support for parent/child account relationships
- Transaction safety with MongoDB transactions
Processing Method:
- Change Stream Trigger: MongoDB watch stream on
onebalancecollection - Manual Trigger: Service can be called with specific
account_id
Architecture
Three-Tier Structure
┌─────────────────────────────────────────────────────────────────┐
│ CRON SCHEDULER │
│ (Currently Disabled) │
│ │
│ - Schedule: Every 30 seconds (commented out) │
│ - Purpose: Periodic check for accounts needing reload │
│ - Status: DISABLED - relies on change streams instead │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SERVICE LAYER │
│ services/onebalance/balance_reload.js │
│ │
│ Core Operations: │
│ 1. Query onebalance documents below threshold │
│ 2. Lock accounts with balance ≤500 credits │
│ 3. Set balance_reload_queue_in_progress flag │
│ 4. Add jobs to Bull queue with 5-day retry span │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ QUEUE PROCESSOR │
│ queues/onebalance/balance_reload.js │
│ │
│ Job Processing: │
│ 1. Verify balance_reload_queue_in_progress flag │
│ 2. Retrieve Stripe API key (main/parent account) │
│ 3. Create Stripe charge for reload amount │
│ 4. Update balance within MongoDB transaction │
│ 5. Create usage log entry (event: 'refill') │
│ 6. Emit socket notification to account owner │
│ │
│ Bull Queue: onebalance_balance_reload_queue (global) │
│ Retry Strategy: 5 attempts, exponential backoff (~1.9h delay) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ CHANGE STREAM WATCHER │
│ streams/onebalance/balance_reload.js │
│ │
│ Real-time Monitoring: │
│ - Watches: onebalance collection for 'update' operations │
│ - Trigger: balance < reload.threshold │
│ - Filter: Only balance field updates │
│ - Deduplication: Set-based account tracking │
│ - Action: Calls service layer with account_id │
└─────────────────────────────────────────────────────────────────┘
Processing Flow
┌──────────────────────────────────────────────────────────────────┐
│ Balance Drop Detected │
│ │
│ Triggers: │
│ 1. Change Stream: balance field updated & balance < threshold │
│ 2. Manual Call: service(account_id) invoked directly │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Service Layer Query │
│ │
│ Onebalance.aggregate([ │
│ { │
│ $match: { │
│ account_id: ObjectId(account_id), │
│ $expr: { $lt: ['$balance', '$reload.threshold'] }, │
│ new_account: { $ne: true }, │
│ balance_reload_queue_in_progress: false, │
│ 'reload.disabled': { $ne: true } │
│ } │
│ } │
│ ]) │
│ │
│ Result: onebalances[] - accounts needing reload │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Account Locking & Flagging │
│ │
│ For all matched accounts: │
│ Onebalance.updateMany( │
│ { _id: { $in: ids } }, │
│ { balance_reload_queue_in_progress: true } │
│ ) │
│ │
│ For critical accounts (balance ≤ 500): │
│ Onebalance.updateMany( │
│ { _id: { $in: critical_ids } }, │
│ { │
│ balance_reload_queue_in_progress: true, │
│ lock: true // Prevents service usage │
│ } │
│ ) │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Add Jobs to Bull Queue │
│ │
│ For each onebalance document: │
│ queue.add(onebalance_data, { │
│ jobId: onebalance._id.toString(), │
│ attempts: 5, │
│ backoff: { │
│ type: 'exponential', │
│ delay: 6857142 // ~1.9 hours │
│ }, │
│ removeOnComplete: true │
│ }) │
│ │
│ Backoff Calculation: │
│ Attempt 1: Immediate │
│ Attempt 2: ~1.9 hours │
│ Attempt 3: ~3.8 hours │
│ Attempt 4: ~7.6 hours │
│ Attempt 5: ~15.2 hours │
│ Total span: ~28.5 hours (~1.2 days) │
│ ⚠️ Comment says "5 days" but actual is ~1.2 days │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Queue Processor Start │
│ │
│ 1. Verify Flag: │
│ processCheck = Onebalance.findOne({ _id: job.id }) │
│ if (processCheck.balance_reload_queue_in_progress == false) │
│ return done() // Exit if flag cleared (duplicate job) │
│ │
│ 2. Retrieve Account & Stripe Key: │
│ account = Account.findById(account_id) │
│ if (account.main) { │
│ stripe = Stripe(STRIPE_SECRET_KEY) // Use global key │
│ } else { │
│ stripe_keys = StripeKey.findOne({ account_id: parent }) │
│ stripe = Stripe(stripe_keys.token.access_token) │
│ } │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Stripe Charge Processing │
│ │
│ Within MongoDB Transaction: │
│ │
│ 1. Update OneBalance (Optimistic): │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { │
│ $set: { │
│ lock: false, │
│ balance_reload_queue_in_progress: false, │
│ retry: false, │
│ retries: [] │
│ }, │
│ $inc: { balance: reload.amount } │
│ }, │
│ { session } │
│ ) │
│ │
│ 2. Retrieve Stripe Customer: │
│ customer = stripe.customers.retrieve(stripe_customer) │
│ │
│ 3. Verify Default Payment Source: │
│ if (!customer.default_source) { │
│ throw Error('No default source') │
│ } │
│ │
│ 4. Create Stripe Charge: │
│ stripe.charges.create({ │
│ amount: reload.amount, │
│ currency: 'USD', │
│ source: customer.default_source, │
│ description: 'Charge for balance reload', │
│ customer: stripe_customer │
│ }) │
│ │
│ Transaction Commit: All updates committed together │
└────────────────────────┬─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ Success Handling │
│ │
│ 1. Create Usage Log: │
│ OnebalanceLogs.save({ │
│ account_id: account_id, │
│ event: 'refill', │
│ cost: reload.amount, │
│ status: 'success', │
│ type: 'credit' │
│ }) │
│ │
│ 2. Emit Socket Notification: │
│ user = User.findOne({ account: account_id, is_owner: true }) │
│ socketEmit('onebalance_charge_updated', [user._id], data) │
│ │
│ 3. Complete Job: │
│ done() │
└──────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ Failure Handling │
│ │
│ On Job Failure (done(err)): │
│ │
│ 1. Record Retry Attempt: │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { │
│ $push: { │
│ retries: { │
│ reason: err.message, │
│ time: new Date() │
│ } │
│ }, │
│ retry: true │
│ } │
│ ) │
│ │
│ 2. Final Failure (after 5 attempts): │
│ Onebalance.updateOne( │
│ { _id: job.id }, │
│ { balance_reload_queue_in_progress: false } │
│ ) │
│ │
│ ⚠️ lock remains true, account stays locked! │
│ ⚠️ No notification sent to user about failure │
└──────────────────────────────────────────────────────────────────┘
Configuration
Environment Variables
# Stripe Configuration
STRIPE_SECRET_KEY=sk_live_xxxxx # Global Stripe API key for main accounts
# MongoDB Configuration
MONGODB_URI=mongodb://localhost:27017/dashclicks
# Redis Configuration (for Bull Queue)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
# Queue Manager Module Control
QM_ONEBALANCE=true # Enable/disable OneBalance module
Service Constants
// services/onebalance/balance_reload.js
const NEXT_RETRY_IN_MINS = 10; // Minimum time between retry attempts (unused currently)
const GLOBAL_THRESHOLD = 500; // Critical balance level for account locking
Queue Configuration
// queues/onebalance/balance_reload.js
{
jobId: onebalance._id.toString(), // Unique job ID prevents duplicates
attempts: 5, // Maximum retry attempts
backoff: {
type: 'exponential',
delay: 6857142 // ~1.9 hours initial delay
},
removeOnComplete: true // Auto-cleanup successful jobs
}
OneBalance Schema Defaults
// shared/models/onebalance.js
{
reload: {
threshold: 1000, // Trigger reload when balance < threshold
amount: 1000, // Amount to charge and add (in credits)
currency: 'usd', // Currency code
disabled: false // Allow/prevent auto-reload
},
balance: 0, // Current credit balance
new_account: true, // Skip reload for new accounts
lock: false, // Account lock status
retry: false, // Retry flag
retries: [], // Array of retry attempts
balance_reload_queue_in_progress: false // Processing flag
}
Data Models
Input Data (Job Data)
The service passes entire onebalance documents to the queue:
{
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
balance: 450, // Current balance below threshold
reload: {
threshold: 1000,
amount: 1000,
currency: "usd",
disabled: false
},
new_account: false,
lock: false,
retry: false,
retries: [],
balance_reload_queue_in_progress: false
}
Database Collections
onebalance Collection
Primary collection managing account credit balances:
{
_id: ObjectId,
account_id: ObjectId, // Reference to accounts collection
balance: Number, // Current credit balance
new_account: Boolean, // Skip auto-reload if true
reload: {
threshold: Number, // Reload trigger point (default: 1000)
amount: Number, // Reload amount (default: 1000)
currency: String, // Currency code (default: 'usd')
disabled: Boolean // Disable auto-reload (default: false)
},
lock: Boolean, // Account locked when balance critical
retry: Boolean, // Retry flag for failed reloads
retries: [
{
time: Date, // Retry timestamp
reason: String // Failure reason
}
],
balance_reload_queue_in_progress: Boolean, // Job processing flag
// Rebilling configuration (per-service pricing)
rebill: {
email: {
multiplier: Number, // Cost multiplier
credits: Number // Base credits cost
},
sms: { multiplier: Number, credits: Number },
inbound_call: { multiplier: Number, credits: Number },
outbound_call: { multiplier: Number, credits: Number },
instasite: { multiplier: Number, credits: Number },
instareport: { multiplier: Number, credits: Number },
lighting_domain: { value: Number, credits: Number },
phone: { multiplier: Number, credits: Number },
listing: { value: Number, credits: Number },
seo_keyword: { multiplier: Number, credits: Number }
},
createdAt: Date,
updatedAt: Date
}
Key Indexes:
account_id(unique)balance(for threshold queries)balance_reload_queue_in_progress(for processing checks)
onebalance.usage_logs Collection
Transaction history for all balance changes:
{
_id: ObjectId,
account_id: ObjectId, // Main account
sub_account_id: ObjectId, // Child account (if applicable)
event: String, // Event type
// Event Types: "email", "sms", "inbound_call", "outbound_call",
// "instasite", "instareport", "seo_keyword", "refill",
// "lighting_domain", "phone", "listing",
// "a2p_registration", "a2p_registration_renewal"
type: String, // "credit" or "debit"
cost: Number, // Transaction amount
base_price: Number, // Original price before multipliers
credits: Number, // Credits used
sub_account_cost: Number, // Cost allocated to sub-account
status: String, // Transaction status
status_reason: String, // Failure reason (if any)
additional_info: Mixed, // Extra metadata
createdAt: Date,
updatedAt: Date
}
Indexes:
account_id, createdAt(for history queries)event, createdAt(for event-specific reports)
accounts Collection
Account information with Stripe customer references:
{
_id: ObjectId,
name: String,
main: Boolean, // true = main account, false = child
parent_account: ObjectId, // Parent account ID (for child accounts)
stripe_customer: String, // Stripe customer ID (cus_xxxxx)
// ... other account fields
}
stripe_keys Collection
OAuth tokens for child account Stripe access:
{
_id: ObjectId,
account_id: ObjectId, // Parent account
token: {
access_token: String, // Stripe OAuth access token
token_type: String, // "bearer"
scope: String, // "read_write"
// ... other OAuth fields
},
createdAt: Date,
updatedAt: Date
}
API Reference
Service Function
module.exports = async account_id => {
/* ... */
};
Parameters:
account_id(ObjectId | String, required) - Account to process for balance reload
Returns:
Promise<void>- No return value, operates via side effects
Side Effects:
- Updates
onebalancedocuments (flags, locks) - Adds jobs to Bull queue
- Logs processing to console
Behavior:
- Returns immediately if
account_idis falsy - Queries onebalance documents matching criteria
- Locks accounts with
balance ≤ 500 - Adds all matched accounts to queue
- Handles errors gracefully with console logging
Usage Example:
const balanceReload = require('./services/onebalance/balance_reload');
// Process specific account
await balanceReload('507f1f77bcf86cd799439012');
// Called from change stream
Stream.on('change', async data => {
await balanceReload(data.fullDocument.account_id);
});
Queue Processor Callbacks
processCb(job, done)
Main job processing function.
Parameters:
job(Bull.Job) - Job object containing onebalance datadone(Function) - Callback to signal completion
Processing Steps:
- Verify
balance_reload_queue_in_progressflag - Retrieve account and Stripe credentials
- Update balance within transaction
- Create Stripe charge
- Call
done()on success ordone(err)on failure
Error Conditions:
balance_reload_queue_in_progress == false→ Exit early (duplicate job)- No Stripe key for parent account → Throw error
- No
stripe_customeron account → Throw error - No default payment source → Throw error
- Stripe API failure → Throw error
failedCb(job, err)
Handles job failures and retry logic.
Parameters:
job(Bull.Job) - Failed job objecterr(Error) - Error that caused failure
Actions:
- Push retry entry to
retriesarray - Set
retry: true - If final attempt, set
balance_reload_queue_in_progress: false
⚠️ Critical Issue:
- Does NOT clear
lock: trueon final failure - Accounts remain locked indefinitely without manual intervention
completedCb(job)
Handles successful job completion.
Parameters:
job(Bull.Job) - Completed job object
Actions:
- Create usage log entry (
event: 'refill') - Find account owner user
- Emit socket notification to owner
- Job automatically removed (removeOnComplete: true)
Change Stream Watcher
exports.start = () => {
/* ... */
};
Watch Configuration:
Onebalance.watch(
[
{
$match: {
$expr: { $lt: ['$fullDocument.balance', '$fullDocument.reload.threshold'] },
operationType: 'update',
'updateDescription.updatedFields.balance': { $exists: 1 },
},
},
],
{
fullDocument: 'updateLookup',
},
);
Trigger Conditions:
- Operation type is
update balancefield was updated- New balance is below threshold
Deduplication:
- Uses
Setto track currently processing accounts - Prevents duplicate jobs for same account
- Clears from set after service call completes
Example Change Event:
{
operationType: 'update',
fullDocument: {
_id: ObjectId("..."),
account_id: ObjectId("..."),
balance: 450,
reload: { threshold: 1000, amount: 1000 }
},
updateDescription: {
updatedFields: { balance: 450 }
}
}
Error Handling
Common Error Scenarios
1. No Stripe Customer ID
// Error Message
"Customer stripe is not connected."
// Cause
account.stripe_customer is null or undefined
// Impact
- Job fails and retries
- Account remains locked (if balance ≤ 500)
- No credits added
// Resolution
1. User must connect Stripe account via dashboard
2. Ensure stripe_customer field populated
3. Job will succeed on next retry
2. No Default Payment Source
// Error Message
"Customer has not set the default source yet."
// Cause
Stripe customer exists but has no default payment method
// Impact
- Job fails and retries
- Account locked
- Transaction rolled back (balance not updated)
// Resolution
1. User must add payment method in Stripe
2. Set default payment source
3. Job will succeed on next retry
3. Missing Parent Stripe Key
// Error Message
"Parent account stripe key does not exist."
// Cause
Child account's parent has no StripeKey document
// Impact
- Job fails and retries
- Cannot process payment
- Account locked
// Resolution
1. Parent account must complete Stripe OAuth
2. StripeKey document created
3. Job will succeed on next retry
4. Stripe API Failures
// Possible Errors
- "Your card was declined."
- "Your card has insufficient funds."
- "Your card has expired."
- "An error occurred while processing your card."
// Cause
Various Stripe payment processing issues
// Impact
- Job fails and retries with exponential backoff
- Account locked
- Balance not updated (transaction rollback)
- Retry attempts tracked
// Resolution
Depends on specific error:
- Card declined: Update payment method
- Insufficient funds: Add funds to card
- Expired card: Update card information
- Processing error: Usually temporary, retry succeeds
5. Duplicate Job Detection
// Scenario
balance_reload_queue_in_progress == false when processor runs
// Cause
- Job completed but completion event fired duplicate
- Race condition between jobs
- Manual flag clearing
// Behavior
- Job exits immediately via done()
- No processing occurs
- No error thrown
// Impact
- Prevents duplicate Stripe charges
- Safety mechanism for idempotency
Error Recovery Strategies
Exponential Backoff
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 6857142 // ~1.9 hours
}
}
// Retry Schedule:
// Attempt 1: Immediate
// Attempt 2: ~1.9 hours later
// Attempt 3: ~3.8 hours later
// Attempt 4: ~7.6 hours later
// Attempt 5: ~15.2 hours later
// Total: ~28.5 hours (~1.2 days)
Purpose:
- Give users time to fix payment issues
- Reduce load on Stripe API
- Avoid rate limiting
⚠️ Documentation Discrepancy:
- Code comment says "5 days span"
- Actual calculation: ~1.2 days
- Consider: Increase delay to ~33 million ms for true 5-day span
Transaction Rollback
// On Stripe API failure
await withTransaction(async session => {
// Balance updated first (optimistic)
await Onebalance.updateOne({ _id: job.id }, { $inc: { balance: amount } }, { session });
// Then Stripe charge attempted
await stripe.charges.create({ ... });
// If charge fails, transaction rolls back automatically
});
Benefits:
- Atomic operations
- No orphaned balance updates
- Data consistency guaranteed
⚠️ Potential Issue:
- Balance updated optimistically before charge
- If transaction large, rollback may take time
- Consider: Move balance update after successful charge
Retry Tracking
// Each failure recorded
{
retries: [
{ time: ISODate("2025-10-13T10:00:00Z"), reason: "Card declined" },
{ time: ISODate("2025-10-13T11:54:00Z"), reason: "Card declined" },
{ time: ISODate("2025-10-13T15:40:00Z"), reason: "Insufficient funds" }
],
retry: true
}
Usage:
- Debugging payment issues
- Support team investigation
- Audit trail
Critical Bugs & Issues
🐛 Bug #1: Account Lock Not Cleared on Final Failure
Issue:
// failedCb only clears balance_reload_queue_in_progress
if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
}
// ❌ lock: true remains set!
Impact:
- Accounts with balance ≤ 500 stay locked forever after 5 failed attempts
- Users cannot use ANY services
- Requires manual database intervention
Fix:
if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
update.lock = false; // ✅ Add this line
}
🐛 Bug #2: No User Notification on Final Failure
Issue:
completedCbsends socket notification on successfailedCbdoes NOT notify user on final failure- User unaware their account is locked
Impact:
- Poor user experience
- Confusion about service interruptions
- Increased support tickets
Fix:
const failedCb = async (job, err) => {
const id = job.data._id;
const update = {
$push: { retries: { reason: err.message, time: new Date() } },
retry: true,
};
if (job.attemptsMade >= job.opts.attempts) {
update.balance_reload_queue_in_progress = false;
update.lock = false;
// ✅ Add notification
const user = await User.findOne({
account: job.data.account_id,
is_owner: true,
});
await socketEmit('onebalance_reload_failed', [user._id.toString()], {
reason: err.message,
balance: job.data.balance,
reload_amount: job.data.reload.amount,
});
// ✅ Consider email notification too
await sendEmail({
to: user.email,
subject: 'Action Required: Balance Reload Failed',
body: `Your balance reload failed after 5 attempts. Please update your payment method.`,
});
}
await Onebalance.updateOne({ _id: id }, update);
};
⚠️ Issue #3: Commented Out Cron Scheduler
Current State:
// Entire cron schedule is commented out
// cron.schedule('*/30 * * * * *', async () => { ... });
Implications:
- Relies 100% on change stream triggers
- No periodic safety check
- If change stream fails/disconnects, no reloads processed
Recommendation:
- Keep change streams for real-time processing
- Enable cron as backup safety net (e.g., every 5 minutes)
- Cron catches edge cases change streams might miss
⚠️ Issue #4: Optimistic Balance Update
Current Flow:
await withTransaction(async session => {
// 1. Update balance FIRST (optimistic)
await Onebalance.updateOne({ $inc: { balance: amount } }, { session });
// 2. THEN charge Stripe
await stripe.charges.create({ ... });
});
Risk:
- If Stripe charge fails, transaction rolls back
- But there's a window where balance appears updated
- Could confuse concurrent operations
Better Approach:
await withTransaction(async session => {
// 1. Charge Stripe FIRST
const charge = await stripe.charges.create({ ... });
// 2. THEN update balance (only if charge succeeded)
if (charge.status === 'succeeded') {
await Onebalance.updateOne({ $inc: { balance: amount } }, { session });
}
});
⚠️ Issue #5: Inconsistent Logging
Current State:
- Uses
console.loginstead of logger utility - No structured logging
- Difficult to track in production
Fix:
// Replace all console.log with logger
logger.info({
initiator: 'QM/onebalance/balance-reload',
message: 'Processed docs for onebalance balance reload',
count: onebalances.length,
});
logger.error({
initiator: 'QM/onebalance/balance-reload',
error: err,
context: { account_id, balance: job.data.balance },
});
Socket Events
onebalance_charge_updated
Emitted on successful balance reload.
Event Name: onebalance_charge_updated
Recipients: Account owner only (is_owner: true)
Payload:
{
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
balance: 1450, // Updated balance (old + reload.amount)
reload: {
threshold: 1000,
amount: 1000,
currency: "usd",
disabled: false
},
lock: false,
retry: false,
retries: [],
balance_reload_queue_in_progress: false
// ... full onebalance document
}
Frontend Usage:
socket.on('onebalance_charge_updated', data => {
// Update UI with new balance
updateBalanceDisplay(data.balance);
// Show success notification
showNotification('Balance reloaded successfully!');
// Remove any lock indicators
if (!data.lock) {
removeLockBanner();
}
});
⚠️ Missing Event:
Currently no socket event for failed reloads. Consider adding onebalance_reload_failed event.
Testing
Unit Tests
const balanceReload = require('./services/onebalance/balance_reload');
const Onebalance = require('./models/onebalance');
const Queue = require('./queues/onebalance/balance_reload');
describe('Balance Reload Service', () => {
test('should process account below threshold', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
new_account: false,
balance_reload_queue_in_progress: false,
});
// Act
await balanceReload(accountId);
// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(true);
});
test('should lock accounts with balance <= 500', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 300, // Below GLOBAL_THRESHOLD
reload: { threshold: 1000, amount: 1000 },
});
// Act
await balanceReload(accountId);
// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.lock).toBe(true);
});
test('should skip new accounts', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
new_account: true, // Should be skipped
});
// Act
await balanceReload(accountId);
// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(false);
});
test('should skip accounts with reload disabled', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000, disabled: true },
});
// Act
await balanceReload(accountId);
// Assert
const updated = await Onebalance.findOne({ account_id: accountId });
expect(updated.balance_reload_queue_in_progress).toBe(false);
});
});
Integration Tests
const Stripe = require('stripe');
const Queue = require('./queues/onebalance/balance_reload');
describe('Balance Reload Queue Processor', () => {
test('should create Stripe charge and update balance', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
const account = await Account.create({
_id: accountId,
name: 'Test Account',
main: true,
stripe_customer: 'cus_test123',
});
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
balance_reload_queue_in_progress: true,
});
// Mock Stripe
const stripeMock = {
customers: {
retrieve: jest.fn().mockResolvedValue({
id: 'cus_test123',
default_source: 'card_test456',
}),
},
charges: {
create: jest.fn().mockResolvedValue({
id: 'ch_test789',
status: 'succeeded',
amount: 1000,
}),
},
};
jest.spyOn(Stripe, 'Stripe').mockReturnValue(stripeMock);
// Act
const queue = await Queue.start();
const job = await queue.add(onebalance._doc, {
jobId: onebalance._id.toString(),
});
await job.finished();
// Assert
const updated = await Onebalance.findById(onebalance._id);
expect(updated.balance).toBe(1450); // 450 + 1000
expect(updated.lock).toBe(false);
expect(updated.balance_reload_queue_in_progress).toBe(false);
const log = await OnebalanceLogs.findOne({
account_id: accountId,
event: 'refill',
});
expect(log).toBeDefined();
expect(log.cost).toBe(1000);
expect(log.status).toBe('success');
});
test('should handle Stripe charge failure', async () => {
// Arrange
const accountId = new mongoose.Types.ObjectId();
const account = await Account.create({
_id: accountId,
stripe_customer: 'cus_test123',
});
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
balance_reload_queue_in_progress: true,
});
// Mock Stripe failure
const stripeMock = {
customers: {
retrieve: jest.fn().mockResolvedValue({
default_source: 'card_test456',
}),
},
charges: {
create: jest.fn().mockRejectedValue(new Error('Your card was declined.')),
},
};
jest.spyOn(Stripe, 'Stripe').mockReturnValue(stripeMock);
// Act
const queue = await Queue.start();
const job = await queue.add(onebalance._doc, {
jobId: onebalance._id.toString(),
attempts: 1, // Single attempt for test
});
try {
await job.finished();
} catch (err) {
// Expected to fail
}
// Assert
const updated = await Onebalance.findById(onebalance._id);
expect(updated.balance).toBe(450); // Unchanged
expect(updated.retry).toBe(true);
expect(updated.retries.length).toBe(1);
expect(updated.retries[0].reason).toBe('Your card was declined.');
});
});
Change Stream Tests
const { start } = require('./streams/onebalance/balance_reload');
const balanceReloadService = require('./services/onebalance/balance_reload');
describe('Balance Reload Change Stream', () => {
test('should trigger on balance update below threshold', async () => {
// Arrange
const serviceSpy = jest.spyOn(balanceReloadService, 'default');
start(); // Start watching
const accountId = new mongoose.Types.ObjectId();
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 1500,
reload: { threshold: 1000 },
});
// Act
await Onebalance.updateOne(
{ _id: onebalance._id },
{ balance: 900 }, // Drop below threshold
);
// Wait for change stream
await new Promise(resolve => setTimeout(resolve, 1000));
// Assert
expect(serviceSpy).toHaveBeenCalledWith(accountId);
});
test('should prevent duplicate processing', async () => {
// Arrange
const serviceSpy = jest.spyOn(balanceReloadService, 'default');
start();
const accountId = new mongoose.Types.ObjectId();
const onebalance = await Onebalance.create({
account_id: accountId,
balance: 1500,
reload: { threshold: 1000 },
});
// Act - Multiple rapid updates
await Promise.all([
Onebalance.updateOne({ _id: onebalance._id }, { balance: 900 }),
Onebalance.updateOne({ _id: onebalance._id }, { balance: 850 }),
Onebalance.updateOne({ _id: onebalance._id }, { balance: 800 }),
]);
await new Promise(resolve => setTimeout(resolve, 1000));
// Assert - Should only call once due to Set-based deduplication
expect(serviceSpy).toHaveBeenCalledTimes(1);
});
});
Manual Testing Procedures
Test Case 1: Successful Balance Reload
-
Setup:
// Create test account with Stripe customer
const account = await Account.create({
name: 'Test Account',
main: true,
stripe_customer: 'cus_xxxxx', // Real Stripe test customer
});
// Create onebalance below threshold
const onebalance = await Onebalance.create({
account_id: account._id,
balance: 450,
reload: { threshold: 1000, amount: 1000 },
}); -
Trigger:
// Update balance to trigger change stream
await Onebalance.updateOne({ _id: onebalance._id }, { balance: 400 }); -
Verify:
- Check Bull queue dashboard for job
- Verify Stripe charge created
- Check balance updated to 1400
- Confirm usage log entry created
- Verify socket notification sent
-
Expected Result:
{
balance: 1400, // 400 + 1000
lock: false,
balance_reload_queue_in_progress: false,
retry: false,
retries: []
}
Test Case 2: Account Locking at Critical Balance
-
Setup:
const onebalance = await Onebalance.create({
account_id: account._id,
balance: 300, // Below GLOBAL_THRESHOLD (500)
reload: { threshold: 1000, amount: 1000 },
}); -
Trigger:
await balanceReload(account._id); -
Verify:
- Check
lock: trueset immediately - Verify account services blocked
- Confirm job added to queue
- After success, verify
lock: false
- Check
Test Case 3: Payment Failure Recovery
-
Setup:
// Use Stripe test card that declines
const account = await Account.create({
stripe_customer: 'cus_test_declined',
}); -
Trigger:
await balanceReload(account._id); -
Verify:
- Job fails with "Card declined" error
- Check retry entry added
- Verify job retries after ~1.9 hours
- After 5 failures, check final state
- ⚠️ Confirm
lockstatus (should be false, currently bug)
-
Expected Final State:
{
balance: 300, // Unchanged
balance_reload_queue_in_progress: false,
retry: true,
retries: [
{ time: Date, reason: "Card declined" },
// ... 5 entries total
],
lock: false // ⚠️ Currently true (bug)
}
Monitoring & Observability
Key Metrics to Track
-
Success Rate
// Query usage logs
const successRate = await OnebalanceLogs.aggregate([
{ $match: { event: 'refill', createdAt: { $gte: last24Hours } } },
{
$group: {
_id: '$status',
count: { $sum: 1 },
},
},
]);
// Calculate: success / (success + failed) * 100 -
Average Reload Amount
await OnebalanceLogs.aggregate([
{ $match: { event: 'refill', status: 'success' } },
{
$group: {
_id: null,
avgCost: { $avg: '$cost' },
totalRevenue: { $sum: '$cost' },
},
},
]); -
Locked Accounts
const lockedCount = await Onebalance.countDocuments({ lock: true });
// Alert if count exceeds threshold
if (lockedCount > 100) {
sendAlert('High number of locked accounts', lockedCount);
} -
Retry Rates
const withRetries = await Onebalance.countDocuments({
'retries.0': { $exists: true },
});
const total = await Onebalance.countDocuments();
const retryRate = (withRetries / total) * 100; -
Queue Depth
const queue = await Queue.start();
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const failed = await queue.getFailedCount();
// Alert on queue backup
if (waiting > 1000) {
sendAlert('Balance reload queue backup', { waiting, active, failed });
}
Dashboard Queries
Accounts Needing Attention:
// Accounts with multiple retry attempts
await Onebalance.find({
$expr: { $gte: [{ $size: '$retries' }, 3] },
}).populate('account_id', 'name email');
// Accounts locked for extended period
await Onebalance.find({
lock: true,
updatedAt: { $lt: new Date(Date.now() - 24 * 60 * 60 * 1000) },
}).populate('account_id');
// Accounts with disabled reload but low balance
await Onebalance.find({
'reload.disabled': true,
balance: { $lt: 500 },
}).populate('account_id');
Revenue Analytics:
// Daily reload revenue
await OnebalanceLogs.aggregate([
{
$match: {
event: 'refill',
status: 'success',
createdAt: { $gte: startOfMonth },
},
},
{
$group: {
_id: { $dateToString: { format: '%Y-%m-%d', date: '$createdAt' } },
revenue: { $sum: '$cost' },
count: { $sum: 1 },
},
},
{ $sort: { _id: 1 } },
]);
Alerting Rules
-
Critical: High Failure Rate
// If >10% of reloads fail in last hour
if (failureRate > 0.1) {
sendPagerDuty('High balance reload failure rate', { failureRate });
} -
Warning: Queue Backup
// If queue depth exceeds normal by 5x
if (waitingCount > averageQueueDepth * 5) {
sendSlack('Balance reload queue backup detected');
} -
Info: Large Reload Volume
// Track unusual reload patterns
if (hourlyReloadCount > historicalAverage * 2) {
sendEmail('Unusual reload activity detected');
}
Performance Considerations
Optimization Opportunities
-
Bulk Operations
// Current: Individual updateOne calls in loop
await Promise.all(
onebalances.map(ob =>
Onebalance.updateOne({ _id: ob._id }, { ... })
)
);
// Better: Single updateMany
const ids = onebalances.map(ob => ob._id);
await Onebalance.updateMany(
{ _id: { $in: ids } },
{ balance_reload_queue_in_progress: true }
); -
Parallel Stripe Charges
// Current: Sequential processing via queue
// Consider: Batch Stripe API calls for same parent account
const groupedByParent = _.groupBy(jobs, 'account.parent_account');
await Promise.all(
Object.entries(groupedByParent).map(async ([parent, jobs]) => {
const stripe = await getStripeClient(parent);
return Promise.all(
jobs.map(job => stripe.charges.create({ ... }))
);
})
); -
Index Optimization
// Current query
Onebalance.aggregate([
{
$match: {
account_id: ObjectId,
$expr: { $lt: ['$balance', '$reload.threshold'] },
new_account: { $ne: true },
balance_reload_queue_in_progress: false,
'reload.disabled': { $ne: true },
},
},
]);
// Recommended indexes:
db.onebalance.createIndex({
account_id: 1,
balance: 1,
'reload.threshold': 1,
});
db.onebalance.createIndex({
balance_reload_queue_in_progress: 1,
'reload.disabled': 1,
}); -
Change Stream Filtering
// Current: Checks balance field update
'updateDescription.updatedFields.balance': { $exists: 1 }
// Enhancement: Add compound filter
$and: [
{ 'updateDescription.updatedFields.balance': { $exists: 1 } },
{ 'fullDocument.reload.disabled': { $ne: true } },
{ 'fullDocument.new_account': { $ne: true } }
]
Scalability Concerns
-
Redis Queue Capacity
- Current: Single global queue for all accounts
- Bottleneck: Sequential job processing
- Solution: Partition queues by parent account or region
-
Stripe Rate Limits
- Current: No rate limit handling
- Risk: 429 errors during high volume
- Solution: Implement rate limiter with retry-after headers
-
Database Connections
- Current: Individual queries per account
- Optimization: Connection pooling tuning
- Monitor: Active connections vs pool size
-
Change Stream Load
- Current: Single stream for all accounts
- Scale: Consider sharding by account ID ranges
- Fallback: Enable cron scheduler as backup
Security Considerations
Data Protection
-
Stripe API Keys
// ✅ Stored in environment variables
const STRIPE_KEY = process.env.STRIPE_SECRET_KEY;
// ✅ OAuth tokens encrypted in database
const stripe_keys = await StripeKey.findOne({ account_id: parent });
// ⚠️ Consider: Key rotation policy
// ⚠️ Consider: Encryption at rest for StripeKey collection -
Transaction Security
// ✅ Uses MongoDB transactions for atomicity
await withTransaction(async session => {
await Onebalance.updateOne({ ... }, { session });
await stripe.charges.create({ ... });
});
// ✅ Prevents duplicate charges via jobId
queue.add(data, { jobId: onebalance._id.toString() }); -
Access Control
// ⚠️ Consider: Verify account permissions
// Before processing, ensure account is active and authorized
const account = await Account.findById(account_id);
if (account.status !== 'active' || account.suspended) {
throw new Error('Account not authorized for reloads');
}
Audit Trail
-
Usage Logs
// ✅ All reloads logged to onebalance.usage_logs
await new OnebalanceLogs({
account_id: job.data.account_id,
event: 'refill',
cost: job.data.reload.amount,
status: 'success',
type: 'credit',
}).save(); -
Retry History
// ✅ Failed attempts tracked in retries array
{
retries: [
{ time: Date, reason: 'Card declined' },
// ... full history preserved
];
} -
Stripe Receipt IDs
// ⚠️ Missing: Stripe charge ID not stored
// Recommendation: Add to usage log
const charge = await stripe.charges.create({ ... });
await new OnebalanceLogs({
// ... existing fields
additional_info: {
stripe_charge_id: charge.id,
stripe_receipt_url: charge.receipt_url
}
}).save();
Compliance
-
PCI DSS
- ✅ No card data stored locally
- ✅ Stripe handles all payment processing
- ✅ Token-based authentication
-
Data Retention
// ⚠️ Consider: Usage log retention policy
// Recommendation: Archive logs older than 7 years
// Monthly job to archive old logs
await OnebalanceLogs.updateMany(
{ createdAt: { $lt: sevenYearsAgo } },
{ $set: { archived: true } },
);
Related Documentation
Internal Dependencies
- Balance Charge Module:
onebalance/balance-charge.md- Counterpart for deducting credits - Billing Setup:
billing/index-cron.md- Stripe integration initialization - A2P Campaign Renewal:
communication/a2p/renewal.md- Uses OneBalance for campaign fees
External Resources
- Stripe API Documentation
- Stripe Charges API
- Bull Queue Documentation
- MongoDB Transactions
- MongoDB Change Streams
Code References
Service Layer:
queue-manager/services/onebalance/balance_reload.js- Main service logicqueue-manager/crons/onebalance/balance_reload.js- Cron scheduler (disabled)queue-manager/streams/onebalance/balance_reload.js- Change stream watcher
Queue Processor:
queue-manager/queues/onebalance/balance_reload.js- Bull queue processorqueue-manager/common/queue-wrapper.js- Queue initialization wrapper
Models:
shared/models/onebalance.js- OneBalance schemashared/models/onebalance-usage_logs.js- Usage logs schemashared/models/account.js- Account schemashared/models/stripe-key.js- Stripe OAuth tokens
Utilities:
shared/utilities/socket.js- Socket.IO emission helperqueue-manager/utils/mongo.js- Transaction wrapper
Revision History
| Date | Version | Changes | Author |
|---|---|---|---|
| 2025-10-13 | 1.0 | Initial documentation | Documentation Team |
Quick Reference
Service Invocation
const balanceReload = require('./services/onebalance/balance_reload');
// Process specific account
await balanceReload(account_id);
Common Queries
// Find accounts needing reload
await Onebalance.find({
$expr: { $lt: ['$balance', '$reload.threshold'] },
'reload.disabled': { $ne: true },
});
// Check reload history
await OnebalanceLogs.find({
account_id: account_id,
event: 'refill',
})
.sort({ createdAt: -1 })
.limit(10);
// Monitor locked accounts
await Onebalance.find({ lock: true });
Troubleshooting Commands
# Check queue status
redis-cli LLEN "bull:onebalance_balance_reload_queue:wait"
redis-cli LLEN "bull:onebalance_balance_reload_queue:active"
redis-cli LLEN "bull:onebalance_balance_reload_queue:failed"
# Clear stuck jobs
redis-cli DEL "bull:onebalance_balance_reload_queue:wait"
# Restart change stream
pm2 restart queue-manager
Documentation Status: ✅ Complete
Last Updated: October 13, 2025
Module Status: 🟡 Active (Cron disabled, change streams enabled)
Priority: 🔴 HIGH (Critical financial operation)