Skip to main content

๐Ÿ” New Billing Authorization Setup

๐Ÿ“– Overviewโ€‹

The New Billing Authorization Setup job initializes Stripe integration for newly connected accounts. It runs every 30 seconds, processes pending billing authorization requests, and orchestrates the initial data sync by queuing 7 separate jobs: webhook setup, charges fetch, customers fetch, disputes fetch, subscriptions fetch, refunds fetch, and products fetch. After all 7 jobs complete (tracked via fetch_count), the system emits a socket notification and marks the Stripe integration as fully initialized.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/billing/index.js
  2. Service Processing: queue-manager/services/billing/new-auth.js
  3. Queue Definitions: 7 separate queue files (webhooks, charges, customers, disputes, subscriptions, refunds, products)
  4. Completion Handler: queue-manager/common/billing.js (completeProcess)

Execution Pattern: Rapid polling (every 30 seconds) with multi-queue orchestration

Queue Names:

  • billing_create_webhook
  • billing_fetch_charges
  • billing_fetch_customers
  • billing_fetch_disputes
  • billing_fetch_subscriptions
  • billing_fetch_refunds
  • billing_fetch_products

Environment Flag: QM_BILLING=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 30s)
participant SERVICE as New Auth Service
participant QUEUE_DB as Queues Collection
participant USER_DB as Users Collection
participant WEBHOOK_Q as Webhook Queue
participant CHARGE_Q as Charges Queue
participant CUSTOMER_Q as Customers Queue
participant DISPUTE_Q as Disputes Queue
participant REFUND_Q as Refunds Queue
participant SUB_Q as Subscriptions Queue
participant PRODUCT_Q as Products Queue
participant STRIPE as Stripe API
participant STRIPE_KEY_DB as StripeKey<br/>Collection
participant SOCKET as Socket Server

CRON->>SERVICE: newAuth()
SERVICE->>QUEUE_DB: Find pending auth requests:<br/>source='billing'<br/>status='pending'<br/>in_progress=false
QUEUE_DB-->>SERVICE: Auth requests

SERVICE->>QUEUE_DB: Set in_progress=true<br/>for all requests

loop Each auth request
SERVICE->>USER_DB: Get account owner
SERVICE->>SERVICE: Prepare token data

par Queue 7 Jobs Simultaneously
SERVICE->>WEBHOOK_Q: Create/update webhook
SERVICE->>CHARGE_Q: Fetch charges
SERVICE->>CUSTOMER_Q: Fetch customers
SERVICE->>DISPUTE_Q: Fetch disputes
SERVICE->>REFUND_Q: Fetch refunds
SERVICE->>SUB_Q: Fetch subscriptions
SERVICE->>PRODUCT_Q: Fetch products
end
end

loop Each Queue Job Completion
WEBHOOK_Q->>STRIPE: Create/update webhook endpoint
WEBHOOK_Q->>STRIPE_KEY_DB: Increment fetch_count

alt fetch_count >= 7
WEBHOOK_Q->>SOCKET: Emit 'billing_data_fetched'<br/>to user
WEBHOOK_Q->>STRIPE_KEY_DB: Set initialized=true<br/>fetch_count=0
WEBHOOK_Q->>QUEUE_DB: Delete auth request
end
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/billing/index.js

Purpose: Schedule billing authorization check every 30 seconds

Cron Pattern: */30 * * * * * (every 30 seconds)

Initialization:

const newAuth = require('../../services/billing/new-auth');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
logger.log({ initiator: 'QM/billing', message: 'Execution Started' });
inProgress = true;
await newAuth();
inProgress = false;
logger.log({ initiator: 'QM/billing', message: 'Execution Finished' });
}
});
} catch (err) {
logger.error({ initiator: 'QM/billing', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

Execution Times: Every 30 seconds (twice per minute)

Logging: Start and finish logs for visibility.

2. Service Processing (Main Orchestrator)โ€‹

File: queue-manager/services/billing/new-auth.js

Purpose: Find pending authorization requests, prepare token data, and queue 7 initialization jobs

Key Features:

  • Query pending billing authorizations from queues collection
  • In-progress flag to prevent duplicate processing
  • Fetch account owner for notification targeting
  • Queue 7 separate jobs for parallel execution

Main Service Function:

const User = require('../../models/user');
const Queue = require('../../models/queues');
const webhooksQueue = require('../../queues/billing/webhooks');
const chargesQueue = require('../../queues/billing/charges');
const customersQueue = require('../../queues/billing/customers');
const disputesQueue = require('../../queues/billing/disputes');
const subscriptionsQueue = require('../../queues/billing/subscriptions');
const refundsQueue = require('../../queues/billing/refunds');
const productsQueue = require('../../queues/billing/products');

module.exports = async () => {
try {
const stripeData = await Queue.find({
source: 'billing',
status: 'pending',
in_progress: false,
})
.sort({ createdAt: -1 })
.lean()
.exec();
if (stripeData.length) {
const ids = stripeData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
await Promise.all(
stripeData.map(async data => {
try {
const token = data?.additional_info;
token.account_id = data?.account_id;
const user = await User.findOne({ account: token.account_id, is_owner: true })
.lean()
.exec();
token.user_id = user?._id?.toString();
token.id = data._id;

if (token) {
const webhookQueue = await webhooksQueue.start();
await webhookQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const chargeQueue = await chargesQueue.start();
await chargeQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const customerQueue = await customersQueue.start();
await customerQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const disputeQueue = await disputesQueue.start();
await disputeQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const refundQueue = await refundsQueue.start();
await refundQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const subscriptionQueue = await subscriptionsQueue.start();
await subscriptionQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const ProductQueue = await productsQueue.start();
await ProductQueue.add(token, {
attempts: 4,
backoff: 4000,
});

console.log(
'Billing app new authorization service started for the account',
data?.account_id.toString(),
);
}
} catch (err) {
await Queue.updateOne({ _id: data._id }, { in_progress: false });
console.log(
`Error occurred while fetching the data from stripe.\nFile path: /queue-manager/services/billing/index.js.\nError: ${err}`,
);
}
}),
);
}
} catch (err) {
console.error('Error processing billing new auth service.', err.message, err.stack);
}
};

Query: Matches requests with:

  • source: 'billing'
  • status: 'pending'
  • in_progress: false

Token Data Structure:

{
access_token: String, // Stripe API key
stripe_user_id: String, // Stripe Connect account ID
account_id: ObjectId, // DashClicks account
user_id: String, // Account owner ID
id: ObjectId, // Queue request ID
// ... other Stripe OAuth data
}

Parallel Job Queuing: All 7 jobs queued simultaneously for fast initialization.

3. Queue Processing - Webhook Setupโ€‹

File: queue-manager/queues/billing/webhooks.js

Purpose: Create or update Stripe webhook endpoint for real-time event notifications

Key Functions:

  • Check if webhook already exists
  • Create new webhook if not found
  • Update existing webhook if found
  • Configure 19 event types for monitoring

Complete Processor (excerpt):

const STRIPE_KEY = process.env.STRIPE_SECRET_KEY;
const Stripe = require('stripe');
const stripe = Stripe(STRIPE_KEY);
const QueueWrapper = require('../../common/queue-wrapper');
const { completeProcess } = require('../../common/billing');
const logger = require('../../utilities/logger');

const processCb = async (job, done) => {
try {
const token = job.data;
const accountId = token?.account_id;
let alreadyExist = false;
let webhookEndpoint;

// Check if webhook already exists
for await (const webhook of stripe.webhookEndpoints.list({ limit: 100 })) {
if (webhook?.url == `${process.env.API_BASE_URL}/v1/billing/webhooks`) {
alreadyExist = true;
webhookEndpoint = { ...webhook };
break;
}
}

if (!alreadyExist) {
// Create new webhook
webhookEndpoint = await stripe.webhookEndpoints.create({
url: `${process.env.API_BASE_URL}/v1/billing/webhooks`,
enabled_events: [
'charge.succeeded',
'charge.updated',
'charge.dispute.created',
'charge.dispute.updated',
'charge.dispute.closed',
'charge.refunded',
'charge.refund.updated',
'customer.created',
'customer.updated',
'customer.deleted',
'customer.subscription.created',
'customer.subscription.updated',
'customer.subscription.deleted',
'product.created',
'product.updated',
'product.deleted',
'invoice.updated',
'invoice.created',
'invoice.paid',
],
description: 'Created by dashclicks. Deleting might cause failure to get updated data.',
connect: true,
});
} else {
// Update existing webhook
webhookEndpoint = await stripe.webhookEndpoints.update(webhookEndpoint.id, {
enabled_events: [
/* same 19 events */
],
description: 'Created by dashclicks. Deleting might cause failure to get updated data.',
disabled: false,
});
}

return done();
} catch (err) {
done(err);
}
};

const completedCb = async job => {
completeProcess(job.data);
};

let queue;

exports.start = async () => {
try {
if (!queue)
queue = QueueWrapper(`billing_create_webhook`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/billing/webhook',
error: err,
message: 'Error while initializing billing webhook queue',
});
}
};

Webhook Events (19 total):

  • Charges: charge.succeeded, charge.updated, charge.refunded
  • Disputes: charge.dispute.created, charge.dispute.updated, charge.dispute.closed
  • Refunds: charge.refund.updated
  • Customers: customer.created, customer.updated, customer.deleted
  • Subscriptions: customer.subscription.created, customer.subscription.updated, customer.subscription.deleted
  • Products: product.created, product.updated, product.deleted
  • Invoices: invoice.created, invoice.updated, invoice.paid

Connect Mode: connect: true - enables Stripe Connect webhook forwarding.

4. Queue Processing - Data Fetching (6 queues)โ€‹

Files:

  • queues/billing/charges.js - Fetch all charges
  • queues/billing/customers.js - Fetch all customers
  • queues/billing/disputes.js - Fetch all disputes
  • queues/billing/subscriptions.js - Fetch all subscriptions
  • queues/billing/refunds.js - Fetch all refunds
  • queues/billing/products.js - Fetch all products

Pattern (same for all 6):

  1. Fetch data from Stripe API (paginated)
  2. Store in DashClicks database
  3. Call completeProcess on completion

Note: Implementation details for each queue not shown (would require reading all 6 files).

5. Completion Handlerโ€‹

File: queue-manager/common/billing.js (completeProcess)

Purpose: Track job completion, emit notification when all 7 jobs finish, mark integration as initialized

Key Functions:

  • Increment fetch_count in StripeKey
  • When fetch_count >= 7 (all jobs complete):
    • Emit socket notification to user
    • Reset fetch_count to 0
    • Set initialized: true
    • Delete auth request from queue

Complete Handler:

const StripeKey = require('../models/stripe-key');
const { socketEmit } = require('../utilities');
const Queue = require('../models/queues');

const completeProcess = async token => {
const userId = token?.user_id;
const stripe_key = await StripeKey.findOne({ account_id: token?.account_id }).lean().exec();
if (stripe_key.fetch_count >= 6) {
// Note: should be >= 7
await socketEmit('billing_data_fetched', [userId.toString()], {
message: `Successfully fetched data, Please refresh the page`,
type: 'success',
});
await StripeKey.updateOne(
{ account_id: token?.account_id },
{ $set: { fetch_count: 0, initialized: true } },
);
await Queue.deleteOne({ _id: token.id });
}
};

module.exports = {
completeProcess,
};

Completion Threshold: fetch_count >= 6 (should be 7 - possible off-by-one error)

Socket Event: billing_data_fetched - sent to account owner.

Note: Each queue job is expected to increment fetch_count (implementation in queue files).

๐Ÿ—„๏ธ Collections Usedโ€‹

queuesโ€‹

  • Operations: Find, Update, Delete
  • Model: shared/models/queues.js
  • Usage Context: Store pending authorization requests

Key Fields:

  • source: 'billing' (identifies billing authorization)
  • status: 'pending' | 'completed' | 'failed'
  • in_progress: Boolean - prevents duplicate processing
  • account_id: ObjectId - target account
  • additional_info: Object - Stripe OAuth token data
    • access_token: Stripe API key
    • stripe_user_id: Stripe Connect account ID
    • Other OAuth response fields

usersโ€‹

  • Operations: Find
  • Model: shared/models/user.js
  • Usage Context: Retrieve account owner for notifications

Query: { account: accountId, is_owner: true }

stripe_keyโ€‹

  • Operations: Find, Update
  • Model: shared/models/stripe-key.js
  • Usage Context: Track initialization progress and store credentials

Key Fields:

  • account_id: ObjectId - account reference
  • fetch_count: Number - tracks completed jobs (0-7)
  • initialized: Boolean - marks complete initialization
  • token: Object - Stripe credentials (from additional_info)

Note: fetch_count incremented by each queue job on completion.

Other Collections (Updated by Queue Jobs)โ€‹

  • billing_charges - Charges data
  • billing_customers - Customers data
  • billing_disputes - Disputes data
  • billing_subscriptions - Subscriptions data
  • billing_refunds - Refunds data
  • billing_products - Products data

Note: Exact collection names may vary (not confirmed without reading all queue files).

๐Ÿ”ง Job Configurationโ€‹

Cron Scheduleโ€‹

'*/30 * * * * *'; // Every 30 seconds

Frequency: 120 times per hour (twice per minute)

Rationale: Fast authorization processing for immediate user feedback; 30-second interval balances responsiveness with system load.

Queue Settingsโ€‹

All 7 Queues:

QueueWrapper(`billing_{queue_name}`, 'global', {
processCb,
completedCb,
});

Queue Names:

  • billing_create_webhook
  • billing_fetch_charges
  • billing_fetch_customers
  • billing_fetch_disputes
  • billing_fetch_subscriptions
  • billing_fetch_refunds
  • billing_fetch_products

Concurrency: Default (1 per queue)

Job Options (all queues):

{
attempts: 4,
backoff: 4000 // 4-second fixed delay between retries
}

Retry Strategy: 4 attempts with 4-second delays (16 seconds total retry window)

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

1. Authorization Request Queryโ€‹

Service Query:

await Queue.find({
source: 'billing',
status: 'pending',
in_progress: false,
}).sort({ createdAt: -1 });

Matches: Pending billing authorizations not currently processing.

Sort Order: Newest first (LIFO) - prioritizes recent connections.

2. In-Progress Lockโ€‹

const ids = stripeData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

Purpose: Prevents duplicate processing by multiple cron runs.

Scope: All pending requests locked at once.

3. Account Owner Lookupโ€‹

const user = await User.findOne({ account: token.account_id, is_owner: true }).lean().exec();
token.user_id = user?._id?.toString();

Purpose: Identify account owner for socket notification targeting.

Requirement: Account must have an owner user.

4. Token Data Preparationโ€‹

const token = data?.additional_info;
token.account_id = data?.account_id;
token.user_id = user?._id?.toString();
token.id = data._id;

Token Fields:

  • access_token: Stripe API key (from OAuth)
  • stripe_user_id: Stripe Connect account ID (from OAuth)
  • account_id: DashClicks account ObjectId
  • user_id: Account owner user ID (for notifications)
  • id: Queue request ID (for cleanup)

5. Parallel Job Queuingโ€‹

All 7 Jobs Queued Simultaneously:

await Promise.all([
webhookQueue.add(token, { attempts: 4, backoff: 4000 }),
chargeQueue.add(token, { attempts: 4, backoff: 4000 }),
customerQueue.add(token, { attempts: 4, backoff: 4000 }),
disputeQueue.add(token, { attempts: 4, backoff: 4000 }),
refundQueue.add(token, { attempts: 4, backoff: 4000 }),
subscriptionQueue.add(token, { attempts: 4, backoff: 4000 }),
ProductQueue.add(token, { attempts: 4, backoff: 4000 }),
]);

Parallel Execution: All 7 jobs run concurrently for fast initialization.

Job Data: Same token object passed to all jobs.

6. Webhook Setup Logicโ€‹

Check for Existing Webhook:

for await (const webhook of stripe.webhookEndpoints.list({ limit: 100 })) {
if (webhook?.url == `${process.env.API_BASE_URL}/v1/billing/webhooks`) {
alreadyExist = true;
webhookEndpoint = { ...webhook };
break;
}
}

URL Match: Checks if webhook with same URL already exists.

Create or Update:

  • If not exists: Create new webhook with 19 event types
  • If exists: Update webhook to ensure correct configuration

Webhook URL: {API_BASE_URL}/v1/billing/webhooks (e.g., https://api.dashclicks.com/v1/billing/webhooks)

7. Completion Trackingโ€‹

Each Queue Job Completion (calls completeProcess):

const completedCb = async job => {
completeProcess(job.data);
};

Completion Handler Logic:

  1. Fetch current fetch_count from StripeKey
  2. Increment fetch_count (assumed - implementation in queue files)
  3. If fetch_count >= 6 (all jobs complete):
    • Emit billing_data_fetched socket event to user
    • Reset fetch_count to 0
    • Set initialized: true
    • Delete auth request from queue

Note: Off-by-one error - checks >= 6 instead of >= 7 (7 jobs total).

8. Socket Notificationโ€‹

await socketEmit('billing_data_fetched', [userId.toString()], {
message: `Successfully fetched data, Please refresh the page`,
type: 'success',
});

Event: billing_data_fetched

Target: Account owner user (by user ID)

Message: Instructs user to refresh page to see imported data.

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Missing Account Ownerโ€‹

Scenario: Account doesn't have an owner user

Handling: user_id set to undefined, may cause issues in notification

Impact: Socket notification may fail (no target user)

Suggestion: Add validation and skip account if no owner

Stripe API Errorsโ€‹

Scenario: Rate limiting, invalid credentials, network issues

Handling: Job retries 4 times with 4-second delays

Impact: Authorization delayed up to 16 seconds, then fails if persistent

Note: No exponential backoff (fixed 4-second delay)

Webhook Creation Failureโ€‹

Scenario: Stripe API error, permissions issue

Handling: Job fails, retries, eventually gives up

Impact: Webhook not created, real-time events not received (manual setup needed)

Individual Job Failureโ€‹

Scenario: One of 7 jobs fails (e.g., charges fetch)

Handling: Job retries, other jobs continue

Impact: fetch_count never reaches 7, initialization never completes

Issue: No timeout or fallback - account stuck in "initializing" state

Database Update Failureโ€‹

Scenario: MongoDB error when updating StripeKey

Handling: Error logged, may retry

Impact: fetch_count or initialized flag not updated correctly

Socket Emission Failureโ€‹

Scenario: Socket server unavailable, user disconnected

Handling: Error logged, initialization still marked complete

Impact: User not notified, but data still imported

Failed Job Callbackโ€‹

Note: No explicit failedCb defined in most queues - relies on default Bull error handling.

Individual Job Error Handling (in service):

try {
// ... queue jobs
} catch (err) {
await Queue.updateOne({ _id: data._id }, { in_progress: false });
console.log(`Error occurred...`);
}

Action: Resets in_progress flag for retry on next cron run.

Completed Job Callbackโ€‹

All Queues:

const completedCb = async job => {
completeProcess(job.data);
};

Action: Call completion handler to track progress.

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

Cron Level:

  • "Execution Started"
  • "Execution Finished"

Service Level:

  • "Billing app new authorization service started for the account {accountId}" (console.log)

Queue Level:

  • No explicit success logging

Error Loggingโ€‹

Cron Level:

  • Error in cron initialization

Service Level:

  • "Error occurred while fetching the data from stripe..." (console.log)
  • "Error processing billing new auth service..." (console.error)

Queue Level:

  • "Error while initializing billing webhook queue"

Performance Metricsโ€‹

  • Queue Lookup: less than 1 second (indexed query)
  • Account Owner Lookup: less than 100ms per account
  • Webhook Check: 1-2 seconds (Stripe API list call)
  • Webhook Create/Update: 1-2 seconds (Stripe API call)
  • Data Fetch Jobs: 5-30 seconds each (depends on data volume)
  • Total Initialization Time: 10-60 seconds (depends on data volume, parallel execution)
    • Small account (less than 100 records each): ~10-15 seconds
    • Medium account (100-1000 records): ~20-40 seconds
    • Large account (1000+ records): ~40-60+ seconds

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Internal API: Creates auth request in queues collection (source: 'billing')
  • User Action: Connects Stripe account via OAuth from dashboard
  • Cron Schedule: Every 30 seconds (no external triggers)

External Dependenciesโ€‹

  • Stripe API: Webhook endpoints, data fetching APIs
  • Stripe OAuth: Token from authorization flow
  • Socket Server (4000): Real-time notifications

Jobs That Depend On Thisโ€‹

  • Webhook Processing: Requires webhook setup for real-time events
  • Billing Sync Jobs: Rely on initial data import
  • Contact Import: Uses customer data imported by this job
  • Billing Dashboard: Displays imported Stripe data
  • Stripe Integration: Requires initialization before use
  • Webhook Events: Real-time sync after initialization

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Stripe Webhook Creation: Creates permanent webhook in Stripe account
  • โš ๏ธ Database Writes: Imports all Stripe data (charges, customers, etc.)
  • โš ๏ธ StripeKey Updates: Modifies fetch_count and initialized flags
  • โš ๏ธ Queue Deletion: Auth request deleted after completion (cannot retry)

Performance Considerationsโ€‹

  • High Frequency: 30-second interval ensures fast initialization
  • Parallel Job Execution: 7 jobs run concurrently (fast but resource-intensive)
  • Stripe API Load: 7+ API calls per account (may hit rate limits for many simultaneous connections)
  • Database Load: High write volume during initial import
  • No Pagination Limits: Fetches ALL data (large accounts may take minutes)

Business Logicโ€‹

Why Every 30 Seconds?

  • Fast turnaround for user waiting on initialization
  • Balances responsiveness with system load
  • 120 checks per hour is reasonable for background job

Why 7 Separate Jobs?

  • Parallel execution for faster initialization
  • Isolation of concerns (each job can fail independently)
  • Allows per-resource retry logic
  • Easier to debug and maintain

Why Webhook Setup First?

  • Ensures real-time events captured during initial import
  • Prevents missing events between import and webhook setup
  • Webhook setup is fast (< 2 seconds)

Why Track with fetch_count?

  • Simple counter-based completion tracking
  • No need for complex state management
  • Each job increments on completion

Why Initialize Flag?

  • Marks account as fully set up
  • Prevents re-initialization on reconnect
  • Allows UI to show "ready" state

Why Socket Notification?

  • Immediate user feedback on completion
  • Prompts user to refresh and see imported data
  • Better UX than polling for completion

Why Delete Queue Request?

  • Prevents reprocessing of completed authorizations
  • Cleans up queue collection
  • Permanent record not needed after completion

Maintenance Notesโ€‹

  • Off-by-One Error: fetch_count >= 6 should be >= 7 (7 jobs total)
  • Console.log Usage: Service uses console.log instead of logger
  • No Timeout: No maximum time for initialization (account can be stuck if job fails)
  • No Retry Limit: Failing accounts retried forever (every 30 seconds)
  • No Batch Operations: Processes accounts one-by-one (could parallelize)
  • Hardcoded Retry Strategy: Fixed 4-second backoff (consider exponential)
  • Missing Owner Validation: No check if owner user exists
  • No Completion Status: Queue request doesn't update to 'completed' (just deleted)

Code Quality Issuesโ€‹

Issue 1: Off-by-One Error (CRITICAL)

if (stripe_key.fetch_count >= 6) {
// Should be >= 7
// Mark as complete
}

Issue: Checks for 6 jobs instead of 7 (webhook + 6 data fetches).

Impact: Initialization completes prematurely, one job still running.

Suggestion: Fix to >= 7 or use constant:

const TOTAL_JOBS = 7;
if (stripe_key.fetch_count >= TOTAL_JOBS) {
// Mark as complete
}

Issue 2: Console.log Instead of Logger

console.log('Billing app new authorization service started...');
console.log(`Error occurred while fetching...`);
console.error('Error processing billing new auth service...');

Suggestion: Use logger for consistency:

logger.log({ initiator: 'QM/billing/new-auth', message: 'Started for account', account_id });
logger.error({ initiator: 'QM/billing/new-auth', error: err });

Issue 3: Missing Owner Validation

const user = await User.findOne({ account: token.account_id, is_owner: true }).lean().exec();
token.user_id = user?._id?.toString(); // May be undefined

Issue: No validation if owner exists.

Suggestion: Skip account if no owner:

if (!user) {
logger.warn({
initiator: 'QM/billing/new-auth',
message: 'No owner found for account',
account_id,
});
await Queue.updateOne({ _id: data._id }, { status: 'failed', in_progress: false });
return;
}

Issue 4: No Timeout for Initialization

Issue: If one job fails permanently, account stuck in "initializing" state.

Suggestion: Add timeout logic:

const INIT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
const createdAt = new Date(data.createdAt);
if (Date.now() - createdAt > INIT_TIMEOUT) {
logger.error({ initiator: 'QM/billing/new-auth', message: 'Initialization timeout', account_id });
await Queue.updateOne({ _id: data._id }, { status: 'failed' });
}

Issue 5: No Job Failure Recovery

Issue: If webhook job fails 4 times, initialization never completes (stuck at fetch_count < 7).

Suggestion: Add fallback or manual retry:

// In completion handler
const elapsedTime = Date.now() - new Date(stripe_key.createdAt);
if (stripe_key.fetch_count < 7 && elapsedTime > 5 * 60 * 1000) {
// After 5 minutes with incomplete jobs, mark as partial success
await StripeKey.updateOne(
{ account_id: token.account_id },
{ $set: { initialized: true, partial_init: true } },
);
}

Issue 6: Parallel Queuing Without Coordination

await webhookQueue.add(token, { attempts: 4, backoff: 4000 });
await chargeQueue.add(token, { attempts: 4, backoff: 4000 });
// ... 5 more jobs

Issue: Not using Promise.all (jobs queued sequentially, not simultaneously).

Suggestion: Use Promise.all for true parallelism:

await Promise.all([
webhookQueue.add(token, { attempts: 4, backoff: 4000 }),
chargeQueue.add(token, { attempts: 4, backoff: 4000 }),
// ... 5 more jobs
]);

Note: Current code already queues sequentially (await each), but since queue.add() is fast (just adds to queue), impact is minimal.

Issue 7: No Fetch Count Increment Shown

Issue: Code shows completeProcess checking fetch_count >= 6, but doesn't show where fetch_count is incremented.

Assumption: Each queue job increments fetch_count in its processor (implementation in individual queue files).

Suggestion: Document expected behavior in completion handler.

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/billing

Create Authorization Requestโ€‹

const Queue = require('./models/queues');

// Simulate Stripe OAuth token response
const stripeToken = {
access_token: 'sk_test_...',
stripe_user_id: 'acct_...',
// ... other OAuth fields
};

// Create authorization request
await new Queue({
source: 'billing',
status: 'pending',
in_progress: false,
account_id: accountId,
additional_info: stripeToken,
}).save();

console.log('Authorization request created, waiting for cron run (up to 30 seconds)');

Verify Initialization Progressโ€‹

const StripeKey = require('./models/stripe-key');

// Check initialization progress
const stripeKey = await StripeKey.findOne({ account_id: accountId });
console.log('Fetch count:', stripeKey.fetch_count); // Should increment from 0 to 7
console.log('Initialized:', stripeKey.initialized); // Should be true when complete

// Monitor progress
const interval = setInterval(async () => {
const key = await StripeKey.findOne({ account_id: accountId });
console.log(`Progress: ${key.fetch_count}/7 jobs completed`);
if (key.initialized) {
console.log('Initialization complete!');
clearInterval(interval);
}
}, 2000);

Verify Webhook Creationโ€‹

const Stripe = require('stripe');
const stripe = Stripe(process.env.STRIPE_SECRET_KEY);

// List webhooks
const webhooks = await stripe.webhookEndpoints.list({ limit: 100 });
const dashClicksWebhook = webhooks.data.find(
w => w.url === `${process.env.API_BASE_URL}/v1/billing/webhooks`,
);

console.log('Webhook exists:', !!dashClicksWebhook);
console.log('Webhook events:', dashClicksWebhook?.enabled_events);
// Expected: 19 event types

Test Socket Notificationโ€‹

// Listen for socket event (in frontend)
socket.on('billing_data_fetched', data => {
console.log('Received notification:', data);
// Expected: { message: 'Successfully fetched data, Please refresh the page', type: 'success' }
});

Monitor Queue Processingโ€‹

# Watch logs during initialization
tail -f logs/queue-manager.log | grep "billing"

# Expected outputs:
# [INFO] Execution Started
# [INFO] Billing app new authorization service started for the account {accountId}
# [INFO] Execution Finished

Test Error Handlingโ€‹

// Test with invalid Stripe credentials
await new Queue({
source: 'billing',
status: 'pending',
in_progress: false,
account_id: accountId,
additional_info: {
access_token: 'invalid_key',
stripe_user_id: 'invalid_id',
},
}).save();

// After cron run, verify:
// 1. Error logged
// 2. Jobs retried 4 times
// 3. in_progress reset to false (for retry)

Job Type: Scheduled with Multi-Queue Orchestration
Execution Frequency: Every 30 seconds
Average Duration: 10-60 seconds (depends on account data volume)
Status: Active
Critical Bug: Off-by-one error in completion check (>= 6 should be >= 7)

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM