๐ New Billing Authorization Setup
๐ Overviewโ
The New Billing Authorization Setup job initializes Stripe integration for newly connected accounts. It runs every 30 seconds, processes pending billing authorization requests, and orchestrates the initial data sync by queuing 7 separate jobs: webhook setup, charges fetch, customers fetch, disputes fetch, subscriptions fetch, refunds fetch, and products fetch. After all 7 jobs complete (tracked via fetch_count), the system emits a socket notification and marks the Stripe integration as fully initialized.
Complete Flow:
- Cron Initialization:
queue-manager/crons/billing/index.js - Service Processing:
queue-manager/services/billing/new-auth.js - Queue Definitions: 7 separate queue files (webhooks, charges, customers, disputes, subscriptions, refunds, products)
- Completion Handler:
queue-manager/common/billing.js(completeProcess)
Execution Pattern: Rapid polling (every 30 seconds) with multi-queue orchestration
Queue Names:
billing_create_webhookbilling_fetch_chargesbilling_fetch_customersbilling_fetch_disputesbilling_fetch_subscriptionsbilling_fetch_refundsbilling_fetch_products
Environment Flag: QM_BILLING=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 30s)
participant SERVICE as New Auth Service
participant QUEUE_DB as Queues Collection
participant USER_DB as Users Collection
participant WEBHOOK_Q as Webhook Queue
participant CHARGE_Q as Charges Queue
participant CUSTOMER_Q as Customers Queue
participant DISPUTE_Q as Disputes Queue
participant REFUND_Q as Refunds Queue
participant SUB_Q as Subscriptions Queue
participant PRODUCT_Q as Products Queue
participant STRIPE as Stripe API
participant STRIPE_KEY_DB as StripeKey<br/>Collection
participant SOCKET as Socket Server
CRON->>SERVICE: newAuth()
SERVICE->>QUEUE_DB: Find pending auth requests:<br/>source='billing'<br/>status='pending'<br/>in_progress=false
QUEUE_DB-->>SERVICE: Auth requests
SERVICE->>QUEUE_DB: Set in_progress=true<br/>for all requests
loop Each auth request
SERVICE->>USER_DB: Get account owner
SERVICE->>SERVICE: Prepare token data
par Queue 7 Jobs Simultaneously
SERVICE->>WEBHOOK_Q: Create/update webhook
SERVICE->>CHARGE_Q: Fetch charges
SERVICE->>CUSTOMER_Q: Fetch customers
SERVICE->>DISPUTE_Q: Fetch disputes
SERVICE->>REFUND_Q: Fetch refunds
SERVICE->>SUB_Q: Fetch subscriptions
SERVICE->>PRODUCT_Q: Fetch products
end
end
loop Each Queue Job Completion
WEBHOOK_Q->>STRIPE: Create/update webhook endpoint
WEBHOOK_Q->>STRIPE_KEY_DB: Increment fetch_count
alt fetch_count >= 7
WEBHOOK_Q->>SOCKET: Emit 'billing_data_fetched'<br/>to user
WEBHOOK_Q->>STRIPE_KEY_DB: Set initialized=true<br/>fetch_count=0
WEBHOOK_Q->>QUEUE_DB: Delete auth request
end
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/billing/index.js
Purpose: Schedule billing authorization check every 30 seconds
Cron Pattern: */30 * * * * * (every 30 seconds)
Initialization:
const newAuth = require('../../services/billing/new-auth');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
logger.log({ initiator: 'QM/billing', message: 'Execution Started' });
inProgress = true;
await newAuth();
inProgress = false;
logger.log({ initiator: 'QM/billing', message: 'Execution Finished' });
}
});
} catch (err) {
logger.error({ initiator: 'QM/billing', error: err });
}
};
In-Progress Lock: Prevents overlapping executions.
Execution Times: Every 30 seconds (twice per minute)
Logging: Start and finish logs for visibility.
2. Service Processing (Main Orchestrator)โ
File: queue-manager/services/billing/new-auth.js
Purpose: Find pending authorization requests, prepare token data, and queue 7 initialization jobs
Key Features:
- Query pending billing authorizations from
queuescollection - In-progress flag to prevent duplicate processing
- Fetch account owner for notification targeting
- Queue 7 separate jobs for parallel execution
Main Service Function:
const User = require('../../models/user');
const Queue = require('../../models/queues');
const webhooksQueue = require('../../queues/billing/webhooks');
const chargesQueue = require('../../queues/billing/charges');
const customersQueue = require('../../queues/billing/customers');
const disputesQueue = require('../../queues/billing/disputes');
const subscriptionsQueue = require('../../queues/billing/subscriptions');
const refundsQueue = require('../../queues/billing/refunds');
const productsQueue = require('../../queues/billing/products');
module.exports = async () => {
try {
const stripeData = await Queue.find({
source: 'billing',
status: 'pending',
in_progress: false,
})
.sort({ createdAt: -1 })
.lean()
.exec();
if (stripeData.length) {
const ids = stripeData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
await Promise.all(
stripeData.map(async data => {
try {
const token = data?.additional_info;
token.account_id = data?.account_id;
const user = await User.findOne({ account: token.account_id, is_owner: true })
.lean()
.exec();
token.user_id = user?._id?.toString();
token.id = data._id;
if (token) {
const webhookQueue = await webhooksQueue.start();
await webhookQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const chargeQueue = await chargesQueue.start();
await chargeQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const customerQueue = await customersQueue.start();
await customerQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const disputeQueue = await disputesQueue.start();
await disputeQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const refundQueue = await refundsQueue.start();
await refundQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const subscriptionQueue = await subscriptionsQueue.start();
await subscriptionQueue.add(token, {
attempts: 4,
backoff: 4000,
});
const ProductQueue = await productsQueue.start();
await ProductQueue.add(token, {
attempts: 4,
backoff: 4000,
});
console.log(
'Billing app new authorization service started for the account',
data?.account_id.toString(),
);
}
} catch (err) {
await Queue.updateOne({ _id: data._id }, { in_progress: false });
console.log(
`Error occurred while fetching the data from stripe.\nFile path: /queue-manager/services/billing/index.js.\nError: ${err}`,
);
}
}),
);
}
} catch (err) {
console.error('Error processing billing new auth service.', err.message, err.stack);
}
};
Query: Matches requests with:
source: 'billing'status: 'pending'in_progress: false
Token Data Structure:
{
access_token: String, // Stripe API key
stripe_user_id: String, // Stripe Connect account ID
account_id: ObjectId, // DashClicks account
user_id: String, // Account owner ID
id: ObjectId, // Queue request ID
// ... other Stripe OAuth data
}
Parallel Job Queuing: All 7 jobs queued simultaneously for fast initialization.
3. Queue Processing - Webhook Setupโ
File: queue-manager/queues/billing/webhooks.js
Purpose: Create or update Stripe webhook endpoint for real-time event notifications
Key Functions:
- Check if webhook already exists
- Create new webhook if not found
- Update existing webhook if found
- Configure 19 event types for monitoring
Complete Processor (excerpt):
const STRIPE_KEY = process.env.STRIPE_SECRET_KEY;
const Stripe = require('stripe');
const stripe = Stripe(STRIPE_KEY);
const QueueWrapper = require('../../common/queue-wrapper');
const { completeProcess } = require('../../common/billing');
const logger = require('../../utilities/logger');
const processCb = async (job, done) => {
try {
const token = job.data;
const accountId = token?.account_id;
let alreadyExist = false;
let webhookEndpoint;
// Check if webhook already exists
for await (const webhook of stripe.webhookEndpoints.list({ limit: 100 })) {
if (webhook?.url == `${process.env.API_BASE_URL}/v1/billing/webhooks`) {
alreadyExist = true;
webhookEndpoint = { ...webhook };
break;
}
}
if (!alreadyExist) {
// Create new webhook
webhookEndpoint = await stripe.webhookEndpoints.create({
url: `${process.env.API_BASE_URL}/v1/billing/webhooks`,
enabled_events: [
'charge.succeeded',
'charge.updated',
'charge.dispute.created',
'charge.dispute.updated',
'charge.dispute.closed',
'charge.refunded',
'charge.refund.updated',
'customer.created',
'customer.updated',
'customer.deleted',
'customer.subscription.created',
'customer.subscription.updated',
'customer.subscription.deleted',
'product.created',
'product.updated',
'product.deleted',
'invoice.updated',
'invoice.created',
'invoice.paid',
],
description: 'Created by dashclicks. Deleting might cause failure to get updated data.',
connect: true,
});
} else {
// Update existing webhook
webhookEndpoint = await stripe.webhookEndpoints.update(webhookEndpoint.id, {
enabled_events: [
/* same 19 events */
],
description: 'Created by dashclicks. Deleting might cause failure to get updated data.',
disabled: false,
});
}
return done();
} catch (err) {
done(err);
}
};
const completedCb = async job => {
completeProcess(job.data);
};
let queue;
exports.start = async () => {
try {
if (!queue)
queue = QueueWrapper(`billing_create_webhook`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/billing/webhook',
error: err,
message: 'Error while initializing billing webhook queue',
});
}
};
Webhook Events (19 total):
- Charges:
charge.succeeded,charge.updated,charge.refunded - Disputes:
charge.dispute.created,charge.dispute.updated,charge.dispute.closed - Refunds:
charge.refund.updated - Customers:
customer.created,customer.updated,customer.deleted - Subscriptions:
customer.subscription.created,customer.subscription.updated,customer.subscription.deleted - Products:
product.created,product.updated,product.deleted - Invoices:
invoice.created,invoice.updated,invoice.paid
Connect Mode: connect: true - enables Stripe Connect webhook forwarding.
4. Queue Processing - Data Fetching (6 queues)โ
Files:
queues/billing/charges.js- Fetch all chargesqueues/billing/customers.js- Fetch all customersqueues/billing/disputes.js- Fetch all disputesqueues/billing/subscriptions.js- Fetch all subscriptionsqueues/billing/refunds.js- Fetch all refundsqueues/billing/products.js- Fetch all products
Pattern (same for all 6):
- Fetch data from Stripe API (paginated)
- Store in DashClicks database
- Call
completeProcesson completion
Note: Implementation details for each queue not shown (would require reading all 6 files).
5. Completion Handlerโ
File: queue-manager/common/billing.js (completeProcess)
Purpose: Track job completion, emit notification when all 7 jobs finish, mark integration as initialized
Key Functions:
- Increment
fetch_countin StripeKey - When
fetch_count >= 7(all jobs complete):- Emit socket notification to user
- Reset
fetch_countto 0 - Set
initialized: true - Delete auth request from queue
Complete Handler:
const StripeKey = require('../models/stripe-key');
const { socketEmit } = require('../utilities');
const Queue = require('../models/queues');
const completeProcess = async token => {
const userId = token?.user_id;
const stripe_key = await StripeKey.findOne({ account_id: token?.account_id }).lean().exec();
if (stripe_key.fetch_count >= 6) {
// Note: should be >= 7
await socketEmit('billing_data_fetched', [userId.toString()], {
message: `Successfully fetched data, Please refresh the page`,
type: 'success',
});
await StripeKey.updateOne(
{ account_id: token?.account_id },
{ $set: { fetch_count: 0, initialized: true } },
);
await Queue.deleteOne({ _id: token.id });
}
};
module.exports = {
completeProcess,
};
Completion Threshold: fetch_count >= 6 (should be 7 - possible off-by-one error)
Socket Event: billing_data_fetched - sent to account owner.
Note: Each queue job is expected to increment fetch_count (implementation in queue files).
๐๏ธ Collections Usedโ
queuesโ
- Operations: Find, Update, Delete
- Model:
shared/models/queues.js - Usage Context: Store pending authorization requests
Key Fields:
source: 'billing' (identifies billing authorization)status: 'pending' | 'completed' | 'failed'in_progress: Boolean - prevents duplicate processingaccount_id: ObjectId - target accountadditional_info: Object - Stripe OAuth token dataaccess_token: Stripe API keystripe_user_id: Stripe Connect account ID- Other OAuth response fields
usersโ
- Operations: Find
- Model:
shared/models/user.js - Usage Context: Retrieve account owner for notifications
Query: { account: accountId, is_owner: true }
stripe_keyโ
- Operations: Find, Update
- Model:
shared/models/stripe-key.js - Usage Context: Track initialization progress and store credentials
Key Fields:
account_id: ObjectId - account referencefetch_count: Number - tracks completed jobs (0-7)initialized: Boolean - marks complete initializationtoken: Object - Stripe credentials (from additional_info)
Note: fetch_count incremented by each queue job on completion.
Other Collections (Updated by Queue Jobs)โ
billing_charges- Charges databilling_customers- Customers databilling_disputes- Disputes databilling_subscriptions- Subscriptions databilling_refunds- Refunds databilling_products- Products data
Note: Exact collection names may vary (not confirmed without reading all queue files).
๐ง Job Configurationโ
Cron Scheduleโ
'*/30 * * * * *'; // Every 30 seconds
Frequency: 120 times per hour (twice per minute)
Rationale: Fast authorization processing for immediate user feedback; 30-second interval balances responsiveness with system load.
Queue Settingsโ
All 7 Queues:
QueueWrapper(`billing_{queue_name}`, 'global', {
processCb,
completedCb,
});
Queue Names:
billing_create_webhookbilling_fetch_chargesbilling_fetch_customersbilling_fetch_disputesbilling_fetch_subscriptionsbilling_fetch_refundsbilling_fetch_products
Concurrency: Default (1 per queue)
Job Options (all queues):
{
attempts: 4,
backoff: 4000 // 4-second fixed delay between retries
}
Retry Strategy: 4 attempts with 4-second delays (16 seconds total retry window)
๐ Processing Logic - Detailed Flowโ
1. Authorization Request Queryโ
Service Query:
await Queue.find({
source: 'billing',
status: 'pending',
in_progress: false,
}).sort({ createdAt: -1 });
Matches: Pending billing authorizations not currently processing.
Sort Order: Newest first (LIFO) - prioritizes recent connections.
2. In-Progress Lockโ
const ids = stripeData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
Purpose: Prevents duplicate processing by multiple cron runs.
Scope: All pending requests locked at once.
3. Account Owner Lookupโ
const user = await User.findOne({ account: token.account_id, is_owner: true }).lean().exec();
token.user_id = user?._id?.toString();
Purpose: Identify account owner for socket notification targeting.
Requirement: Account must have an owner user.
4. Token Data Preparationโ
const token = data?.additional_info;
token.account_id = data?.account_id;
token.user_id = user?._id?.toString();
token.id = data._id;
Token Fields:
access_token: Stripe API key (from OAuth)stripe_user_id: Stripe Connect account ID (from OAuth)account_id: DashClicks account ObjectIduser_id: Account owner user ID (for notifications)id: Queue request ID (for cleanup)
5. Parallel Job Queuingโ
All 7 Jobs Queued Simultaneously:
await Promise.all([
webhookQueue.add(token, { attempts: 4, backoff: 4000 }),
chargeQueue.add(token, { attempts: 4, backoff: 4000 }),
customerQueue.add(token, { attempts: 4, backoff: 4000 }),
disputeQueue.add(token, { attempts: 4, backoff: 4000 }),
refundQueue.add(token, { attempts: 4, backoff: 4000 }),
subscriptionQueue.add(token, { attempts: 4, backoff: 4000 }),
ProductQueue.add(token, { attempts: 4, backoff: 4000 }),
]);
Parallel Execution: All 7 jobs run concurrently for fast initialization.
Job Data: Same token object passed to all jobs.
6. Webhook Setup Logicโ
Check for Existing Webhook:
for await (const webhook of stripe.webhookEndpoints.list({ limit: 100 })) {
if (webhook?.url == `${process.env.API_BASE_URL}/v1/billing/webhooks`) {
alreadyExist = true;
webhookEndpoint = { ...webhook };
break;
}
}
URL Match: Checks if webhook with same URL already exists.
Create or Update:
- If not exists: Create new webhook with 19 event types
- If exists: Update webhook to ensure correct configuration
Webhook URL: {API_BASE_URL}/v1/billing/webhooks (e.g., https://api.dashclicks.com/v1/billing/webhooks)
7. Completion Trackingโ
Each Queue Job Completion (calls completeProcess):
const completedCb = async job => {
completeProcess(job.data);
};
Completion Handler Logic:
- Fetch current
fetch_countfrom StripeKey - Increment
fetch_count(assumed - implementation in queue files) - If
fetch_count >= 6(all jobs complete):- Emit
billing_data_fetchedsocket event to user - Reset
fetch_countto 0 - Set
initialized: true - Delete auth request from queue
- Emit
Note: Off-by-one error - checks >= 6 instead of >= 7 (7 jobs total).
8. Socket Notificationโ
await socketEmit('billing_data_fetched', [userId.toString()], {
message: `Successfully fetched data, Please refresh the page`,
type: 'success',
});
Event: billing_data_fetched
Target: Account owner user (by user ID)
Message: Instructs user to refresh page to see imported data.
๐จ Error Handlingโ
Common Error Scenariosโ
Missing Account Ownerโ
Scenario: Account doesn't have an owner user
Handling: user_id set to undefined, may cause issues in notification
Impact: Socket notification may fail (no target user)
Suggestion: Add validation and skip account if no owner
Stripe API Errorsโ
Scenario: Rate limiting, invalid credentials, network issues
Handling: Job retries 4 times with 4-second delays
Impact: Authorization delayed up to 16 seconds, then fails if persistent
Note: No exponential backoff (fixed 4-second delay)
Webhook Creation Failureโ
Scenario: Stripe API error, permissions issue
Handling: Job fails, retries, eventually gives up
Impact: Webhook not created, real-time events not received (manual setup needed)
Individual Job Failureโ
Scenario: One of 7 jobs fails (e.g., charges fetch)
Handling: Job retries, other jobs continue
Impact: fetch_count never reaches 7, initialization never completes
Issue: No timeout or fallback - account stuck in "initializing" state
Database Update Failureโ
Scenario: MongoDB error when updating StripeKey
Handling: Error logged, may retry
Impact: fetch_count or initialized flag not updated correctly
Socket Emission Failureโ
Scenario: Socket server unavailable, user disconnected
Handling: Error logged, initialization still marked complete
Impact: User not notified, but data still imported
Failed Job Callbackโ
Note: No explicit failedCb defined in most queues - relies on default Bull error handling.
Individual Job Error Handling (in service):
try {
// ... queue jobs
} catch (err) {
await Queue.updateOne({ _id: data._id }, { in_progress: false });
console.log(`Error occurred...`);
}
Action: Resets in_progress flag for retry on next cron run.
Completed Job Callbackโ
All Queues:
const completedCb = async job => {
completeProcess(job.data);
};
Action: Call completion handler to track progress.
๐ Monitoring & Loggingโ
Success Loggingโ
Cron Level:
- "Execution Started"
- "Execution Finished"
Service Level:
- "Billing app new authorization service started for the account {accountId}" (console.log)
Queue Level:
- No explicit success logging
Error Loggingโ
Cron Level:
- Error in cron initialization
Service Level:
- "Error occurred while fetching the data from stripe..." (console.log)
- "Error processing billing new auth service..." (console.error)
Queue Level:
- "Error while initializing billing webhook queue"
Performance Metricsโ
- Queue Lookup: less than 1 second (indexed query)
- Account Owner Lookup: less than 100ms per account
- Webhook Check: 1-2 seconds (Stripe API list call)
- Webhook Create/Update: 1-2 seconds (Stripe API call)
- Data Fetch Jobs: 5-30 seconds each (depends on data volume)
- Total Initialization Time: 10-60 seconds (depends on data volume, parallel execution)
- Small account (less than 100 records each): ~10-15 seconds
- Medium account (100-1000 records): ~20-40 seconds
- Large account (1000+ records): ~40-60+ seconds
๐ Integration Pointsโ
Triggers This Jobโ
- Internal API: Creates auth request in
queuescollection (source: 'billing') - User Action: Connects Stripe account via OAuth from dashboard
- Cron Schedule: Every 30 seconds (no external triggers)
External Dependenciesโ
- Stripe API: Webhook endpoints, data fetching APIs
- Stripe OAuth: Token from authorization flow
- Socket Server (4000): Real-time notifications
Jobs That Depend On Thisโ
- Webhook Processing: Requires webhook setup for real-time events
- Billing Sync Jobs: Rely on initial data import
- Contact Import: Uses customer data imported by this job
Related Featuresโ
- Billing Dashboard: Displays imported Stripe data
- Stripe Integration: Requires initialization before use
- Webhook Events: Real-time sync after initialization
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Stripe Webhook Creation: Creates permanent webhook in Stripe account
- โ ๏ธ Database Writes: Imports all Stripe data (charges, customers, etc.)
- โ ๏ธ StripeKey Updates: Modifies
fetch_countandinitializedflags - โ ๏ธ Queue Deletion: Auth request deleted after completion (cannot retry)
Performance Considerationsโ
- High Frequency: 30-second interval ensures fast initialization
- Parallel Job Execution: 7 jobs run concurrently (fast but resource-intensive)
- Stripe API Load: 7+ API calls per account (may hit rate limits for many simultaneous connections)
- Database Load: High write volume during initial import
- No Pagination Limits: Fetches ALL data (large accounts may take minutes)
Business Logicโ
Why Every 30 Seconds?
- Fast turnaround for user waiting on initialization
- Balances responsiveness with system load
- 120 checks per hour is reasonable for background job
Why 7 Separate Jobs?
- Parallel execution for faster initialization
- Isolation of concerns (each job can fail independently)
- Allows per-resource retry logic
- Easier to debug and maintain
Why Webhook Setup First?
- Ensures real-time events captured during initial import
- Prevents missing events between import and webhook setup
- Webhook setup is fast (< 2 seconds)
Why Track with fetch_count?
- Simple counter-based completion tracking
- No need for complex state management
- Each job increments on completion
Why Initialize Flag?
- Marks account as fully set up
- Prevents re-initialization on reconnect
- Allows UI to show "ready" state
Why Socket Notification?
- Immediate user feedback on completion
- Prompts user to refresh and see imported data
- Better UX than polling for completion
Why Delete Queue Request?
- Prevents reprocessing of completed authorizations
- Cleans up queue collection
- Permanent record not needed after completion
Maintenance Notesโ
- Off-by-One Error:
fetch_count >= 6should be>= 7(7 jobs total) - Console.log Usage: Service uses console.log instead of logger
- No Timeout: No maximum time for initialization (account can be stuck if job fails)
- No Retry Limit: Failing accounts retried forever (every 30 seconds)
- No Batch Operations: Processes accounts one-by-one (could parallelize)
- Hardcoded Retry Strategy: Fixed 4-second backoff (consider exponential)
- Missing Owner Validation: No check if owner user exists
- No Completion Status: Queue request doesn't update to 'completed' (just deleted)
Code Quality Issuesโ
Issue 1: Off-by-One Error (CRITICAL)
if (stripe_key.fetch_count >= 6) {
// Should be >= 7
// Mark as complete
}
Issue: Checks for 6 jobs instead of 7 (webhook + 6 data fetches).
Impact: Initialization completes prematurely, one job still running.
Suggestion: Fix to >= 7 or use constant:
const TOTAL_JOBS = 7;
if (stripe_key.fetch_count >= TOTAL_JOBS) {
// Mark as complete
}
Issue 2: Console.log Instead of Logger
console.log('Billing app new authorization service started...');
console.log(`Error occurred while fetching...`);
console.error('Error processing billing new auth service...');
Suggestion: Use logger for consistency:
logger.log({ initiator: 'QM/billing/new-auth', message: 'Started for account', account_id });
logger.error({ initiator: 'QM/billing/new-auth', error: err });
Issue 3: Missing Owner Validation
const user = await User.findOne({ account: token.account_id, is_owner: true }).lean().exec();
token.user_id = user?._id?.toString(); // May be undefined
Issue: No validation if owner exists.
Suggestion: Skip account if no owner:
if (!user) {
logger.warn({
initiator: 'QM/billing/new-auth',
message: 'No owner found for account',
account_id,
});
await Queue.updateOne({ _id: data._id }, { status: 'failed', in_progress: false });
return;
}
Issue 4: No Timeout for Initialization
Issue: If one job fails permanently, account stuck in "initializing" state.
Suggestion: Add timeout logic:
const INIT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
const createdAt = new Date(data.createdAt);
if (Date.now() - createdAt > INIT_TIMEOUT) {
logger.error({ initiator: 'QM/billing/new-auth', message: 'Initialization timeout', account_id });
await Queue.updateOne({ _id: data._id }, { status: 'failed' });
}
Issue 5: No Job Failure Recovery
Issue: If webhook job fails 4 times, initialization never completes (stuck at fetch_count < 7).
Suggestion: Add fallback or manual retry:
// In completion handler
const elapsedTime = Date.now() - new Date(stripe_key.createdAt);
if (stripe_key.fetch_count < 7 && elapsedTime > 5 * 60 * 1000) {
// After 5 minutes with incomplete jobs, mark as partial success
await StripeKey.updateOne(
{ account_id: token.account_id },
{ $set: { initialized: true, partial_init: true } },
);
}
Issue 6: Parallel Queuing Without Coordination
await webhookQueue.add(token, { attempts: 4, backoff: 4000 });
await chargeQueue.add(token, { attempts: 4, backoff: 4000 });
// ... 5 more jobs
Issue: Not using Promise.all (jobs queued sequentially, not simultaneously).
Suggestion: Use Promise.all for true parallelism:
await Promise.all([
webhookQueue.add(token, { attempts: 4, backoff: 4000 }),
chargeQueue.add(token, { attempts: 4, backoff: 4000 }),
// ... 5 more jobs
]);
Note: Current code already queues sequentially (await each), but since queue.add() is fast (just adds to queue), impact is minimal.
Issue 7: No Fetch Count Increment Shown
Issue: Code shows completeProcess checking fetch_count >= 6, but doesn't show where fetch_count is incremented.
Assumption: Each queue job increments fetch_count in its processor (implementation in individual queue files).
Suggestion: Document expected behavior in completion handler.
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/billing
Create Authorization Requestโ
const Queue = require('./models/queues');
// Simulate Stripe OAuth token response
const stripeToken = {
access_token: 'sk_test_...',
stripe_user_id: 'acct_...',
// ... other OAuth fields
};
// Create authorization request
await new Queue({
source: 'billing',
status: 'pending',
in_progress: false,
account_id: accountId,
additional_info: stripeToken,
}).save();
console.log('Authorization request created, waiting for cron run (up to 30 seconds)');
Verify Initialization Progressโ
const StripeKey = require('./models/stripe-key');
// Check initialization progress
const stripeKey = await StripeKey.findOne({ account_id: accountId });
console.log('Fetch count:', stripeKey.fetch_count); // Should increment from 0 to 7
console.log('Initialized:', stripeKey.initialized); // Should be true when complete
// Monitor progress
const interval = setInterval(async () => {
const key = await StripeKey.findOne({ account_id: accountId });
console.log(`Progress: ${key.fetch_count}/7 jobs completed`);
if (key.initialized) {
console.log('Initialization complete!');
clearInterval(interval);
}
}, 2000);
Verify Webhook Creationโ
const Stripe = require('stripe');
const stripe = Stripe(process.env.STRIPE_SECRET_KEY);
// List webhooks
const webhooks = await stripe.webhookEndpoints.list({ limit: 100 });
const dashClicksWebhook = webhooks.data.find(
w => w.url === `${process.env.API_BASE_URL}/v1/billing/webhooks`,
);
console.log('Webhook exists:', !!dashClicksWebhook);
console.log('Webhook events:', dashClicksWebhook?.enabled_events);
// Expected: 19 event types
Test Socket Notificationโ
// Listen for socket event (in frontend)
socket.on('billing_data_fetched', data => {
console.log('Received notification:', data);
// Expected: { message: 'Successfully fetched data, Please refresh the page', type: 'success' }
});
Monitor Queue Processingโ
# Watch logs during initialization
tail -f logs/queue-manager.log | grep "billing"
# Expected outputs:
# [INFO] Execution Started
# [INFO] Billing app new authorization service started for the account {accountId}
# [INFO] Execution Finished
Test Error Handlingโ
// Test with invalid Stripe credentials
await new Queue({
source: 'billing',
status: 'pending',
in_progress: false,
account_id: accountId,
additional_info: {
access_token: 'invalid_key',
stripe_user_id: 'invalid_id',
},
}).save();
// After cron run, verify:
// 1. Error logged
// 2. Jobs retried 4 times
// 3. in_progress reset to false (for retry)
Job Type: Scheduled with Multi-Queue Orchestration
Execution Frequency: Every 30 seconds
Average Duration: 10-60 seconds (depends on account data volume)
Status: Active
Critical Bug: Off-by-one error in completion check (>= 6 should be >= 7)