๐ฅ Billing Contact Import
๐ Overviewโ
The Billing Contact Import job automatically syncs Stripe customers into the DashClicks CRM as contacts. It runs every 30 seconds, processes pending import requests from the queue, fetches all Stripe customers (up to 100 per import), and creates or updates DashClicks contacts based on configurable field mappings. The job supports both person and business contact types, tracks import statistics (added, updated, errors), and generates import result logs for user visibility.
Complete Flow:
- Cron Initialization:
queue-manager/crons/billing/import.js - Service Processing:
queue-manager/services/billing/contacts.js(importContacts) - Queue Definition:
queue-manager/queues/billing/contacts/person.js&business.js
Execution Pattern: Rapid polling (every 30 seconds) with dynamic queue creation per import request
Queue Name: billing_contact_{importId} (unique per import)
Environment Flag: QM_BILLING_IMPORT=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 30s)
participant SERVICE as Import Service
participant QUEUE_DB as Queues Collection
participant STRIPE_DB as StripeKey<br/>Collection
participant ACCOUNT_DB as Accounts DB
participant PERSON_Q as Person Queue
participant BUSINESS_Q as Business Queue
participant STRIPE as Stripe API
participant CONTACT_DB as Contacts<br/>Collection
participant RESULTS_DB as Import Results<br/>Collection
CRON->>SERVICE: importContacts()
SERVICE->>QUEUE_DB: Find pending imports:<br/>source='billing-contacts'<br/>status='pending'<br/>in_progress=false
QUEUE_DB-->>SERVICE: Import requests
SERVICE->>QUEUE_DB: Set in_progress=true<br/>for all requests
loop Each import request
SERVICE->>STRIPE_DB: Get Stripe credentials<br/>for account
SERVICE->>ACCOUNT_DB: Get account details
SERVICE->>SERVICE: Generate JWT token<br/>for API access
alt Import type = 'contacts' (person)
SERVICE->>PERSON_Q: Create queue & add job
else Import type = 'companies' (business)
SERVICE->>BUSINESS_Q: Create queue & add job
end
end
loop Queue Processing (Person/Business)
PERSON_Q->>STRIPE: List all customers<br/>limit=100, paginate
loop Each Stripe customer
STRIPE-->>PERSON_Q: Customer data
PERSON_Q->>PERSON_Q: Apply field mappings<br/>Stripe โ DashClicks
PERSON_Q->>CONTACT_DB: Check if contact exists<br/>(email + type + account)
alt Contact exists
PERSON_Q->>CONTACT_DB: Update contact<br/>with mapped data
else Contact doesn't exist AND newContact=true
PERSON_Q->>CONTACT_DB: Create new contact<br/>source='stripe'
end
end
PERSON_Q->>RESULTS_DB: Save import results:<br/>records_added, records_updated,<br/>import_errors_count
PERSON_Q->>QUEUE_DB: Delete import request
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/billing/import.js
Purpose: Schedule billing contact import check every 30 seconds
Cron Pattern: */30 * * * * * (every 30 seconds)
Initialization:
const { importContacts } = require('../../services/billing/contacts');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
logger.log({
initiator: 'QM/billing/import',
message: 'Billing Contact Execution Started',
});
inProgress = true;
await importContacts();
inProgress = false;
logger.log({
initiator: 'QM/billing/import',
message: 'Billing Contact Execution Finished',
});
}
});
} catch (err) {
logger.error({ initiator: 'QM/billing/import', error: err });
}
};
In-Progress Lock: Prevents overlapping executions.
Execution Times: Every 30 seconds (twice per minute)
Logging: Start and finish logs for visibility.
2. Service Processingโ
File: queue-manager/services/billing/contacts.js (importContacts export)
Purpose: Find pending import requests, validate Stripe credentials, generate auth tokens, and queue imports
Key Features:
- Query pending imports from
queuescollection - In-progress flag to prevent duplicate processing
- JWT token generation for API access
- Dynamic queue creation based on import type (person vs business)
Main Service Function:
const jwt = require('jsonwebtoken');
const Account = require('../../models/account');
const Queue = require('../../models/queues');
const personQueue = require('../../queues/billing/contacts/person');
const businessQueue = require('../../queues/billing/contacts/business');
const StripeKey = require('../../models/stripe-key');
exports.importContacts = async () => {
try {
let importData = await Queue.find({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
})
.sort({ createdAt: -1 })
.lean()
.exec();
const ids = importData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
if (importData.length) {
await Promise.all(
importData.map(async importParams => {
importParams.accountId = importParams.account_id.toString();
const stripeData = await StripeKey.findOne({
account_id: importParams.accountId,
connected_apps: 'billing',
})
.lean()
.exec();
if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}
const stripeToken = stripeData.token;
let type = importParams.type;
let acc = await Account.findById(importParams.accountId);
importParams.ownerId = importParams.user_id.toString();
importParams.authToken = jwt.sign(
{
type: 'access_token',
uid: importParams.user_id.toString(),
account_id: importParams.account_id.toString(),
parent_account: importParams.parent_account.toString(),
client_id: importParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
let accessToken = importParams.authToken;
let accountId = importParams.accountId;
let ownerId = importParams.ownerId;
let mappings = importParams.mappings;
let newContact = importParams.new_contact;
if (type === 'contacts') {
let Q = await personQueue.start(importParams._id.toString());
await Q.add(
{
id: importParams._id.toString(),
account_id: accountId,
owner: ownerId,
importParams,
mappings,
newContact,
accessToken,
stripeToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
if (type === 'companies') {
let Q = await businessQueue.start(importParams._id.toString());
await Q.add(
{
id: importParams._id.toString(),
account_id: accountId,
owner: ownerId,
importParams,
mappings,
newContact,
accessToken,
stripeToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
}),
);
console.log('Import Started');
} else {
}
} catch (error) {
console.log(error);
}
};
Query: Matches imports with:
status: 'pending'source: 'billing-contacts'in_progress: false
Import Types:
contacts- Person contacts (individuals)companies- Business contacts (organizations)
JWT Token Scope: Includes contacts, communications, files permissions (1-hour expiry)
3. Queue Processing - Person Contactsโ
File: queue-manager/queues/billing/contacts/person.js
Purpose: Import Stripe customers as person contacts with field mapping and duplicate detection
Key Functions:
- Fetch all Stripe customers (paginated, 100 per request)
- Apply custom field mappings (Stripe โ DashClicks)
- Check for existing contacts by email
- Create new contacts or update existing ones
- Track statistics (added, updated, errors)
- Save import results log
Complete Processor:
const Stripe = require('stripe');
const mongoose = require('mongoose');
const ContactImportResults = require('../../../models/contact-import-results');
const Contact = require('../../../models/contact');
const Queue = require('../../../models/queues');
const QueueWrapper = require('../../../common/queue-wrapper');
const { getMapping } = require('../../../common/billing_add_mapping');
const logger = require('../../../utilities/logger');
exports.start = async name => {
let totalRecordsAdded = 0;
let totalRecordsUpdated = 0;
let totalErrors = 0;
let invalidContacts = [];
let contact;
try {
const processCb = async (job, done) => {
try {
const { account_id, owner, mappings, newContact, stripeToken, account } = job.data;
const accountId = account_id;
const stripe = Stripe(stripeToken.access_token);
const stripe_user_id = stripeToken.stripe_user_id;
try {
let dataToInsert = {
parent_account: new mongoose.Types.ObjectId(accountId),
owner,
created_by: owner,
type: 'person',
};
for await (const customer of stripe.customers.list(
{ limit: 100 },
{ stripeAccount: stripe_user_id },
)) {
const customerData =
mappings && Object.keys(mappings).length && getMapping({ mappings, customer });
dataToInsert = { ...dataToInsert, ...customerData };
contact = customer;
dataToInsert.stripe_customer_id = customer.id;
const isContactExist = await Contact.findOne({
email: customer?.email,
type: 'person',
parent_account: new mongoose.Types.ObjectId(account_id),
});
if (isContactExist) {
if (mappings && Object.keys(mappings).length) {
const contactData = getMapping({
mappings,
customer,
contact: isContactExist,
});
await Contact.updateOne({ _id: isContactExist._id }, { ...contactData });
} else {
await Contact.updateOne(
{ _id: isContactExist._id },
{ stripe_customer_id: customer.id },
);
}
totalRecordsUpdated += 1;
} else if (newContact) {
dataToInsert.source = 'stripe';
dataToInsert.email = customer?.email;
await new Contact(dataToInsert).save();
totalRecordsAdded += 1;
}
}
} catch (err) {
invalidContacts.push(
err.additional_data || {
message: err.toString(),
email: contact?.email,
},
);
totalErrors += 1;
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
message: `Error while adding/updating contact`,
});
}
try {
let contactImportLog = {
status: 'in_progress',
name: 'stripe',
type: 'person',
records_added: totalRecordsAdded,
records_updated: totalRecordsUpdated,
import_errors_count: totalErrors,
user_id: new mongoose.Types.ObjectId(owner),
account_id: account_id,
importerrors: invalidContacts || [],
};
await new ContactImportResults(contactImportLog).save();
} catch (err) {
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
});
}
return done();
} catch (err) {
done(err);
}
};
const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};
const queue = QueueWrapper(`billing_contact_${name}`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
message: `Error while initializing contact`,
});
}
};
4. Queue Processing - Business Contactsโ
File: queue-manager/queues/billing/contacts/business.js
Purpose: Import Stripe customers as business contacts (identical logic to person, different type)
Key Difference: Sets type: 'business' instead of type: 'person'
All Other Logic: Identical to person queue processor (same field mapping, duplicate detection, etc.)
5. Field Mapping Utilityโ
File: queue-manager/common/billing_add_mapping.js (getMapping)
Purpose: Transform Stripe customer fields to DashClicks contact fields based on user-defined mappings
Key Functions:
getValueBydotKey- Extract nested values using dot notation (e.g., 'address.city')mapCustomerData- Map Stripe fields to DashClicks fieldsmapContactData- Preserve existing contact values for unmapped fieldsgetMapping- Main mapping orchestrator
Mapping Logic:
const getMapping = ({ mappings, customer, contact }) => {
let updObj = {
stripe_customer_id: customer.id,
email: customer.email,
}
let customerData = {};
let contactData = {};
let customerAddress = {};
let customerShipping = {};
// ... address and shipping mapping logic ...
Object.entries(mappings).forEach(([k, v]) => {
k = k.replace(/\-/g, '.'); // Convert hyphens to dots
const value = getValueBydotKey(k, customer); // Get Stripe value
const oldValue = contact && getValueBydotKey(v, contact?._doc); // Get existing contact value
// Map Stripe value or preserve old value
mapCustomerData({...});
contact && mapContactData({...});
})
updObj = { ...updObj, ...customerData };
if (contact) {
updObj.old_field_values = { email: contact?._doc?.email, ...contactData };
}
return updObj;
}
Special Field Handling:
- Address fields:
address.street,address.city,address.state_province, etc. - Shipping fields:
shipping.address.street,shipping.address.city, etc. - Standard fields:
email,name,phone, custom fields, etc.
Old Field Values: Preserved in old_field_values for audit/rollback purposes.
๐๏ธ Collections Usedโ
queuesโ
- Operations: Find, Update, Delete
- Model:
shared/models/queues.js - Usage Context: Store pending import requests
Key Fields:
status: 'pending' | 'completed' | 'failed'source: 'billing-contacts' (identifies billing imports)in_progress: Boolean - prevents duplicate processingtype: 'contacts' | 'companies' (person vs business)account_id: ObjectId - target accountuser_id: ObjectId - user who initiated importparent_account: ObjectId - parent account referenceclient_id: ObjectId - OAuth client referencemappings: Object - field mapping configurationnew_contact: Boolean - whether to create new contacts or update only
stripe_keyโ
- Operations: Find
- Model:
shared/models/stripe-key.js - Usage Context: Retrieve Stripe API credentials
Key Fields:
account_id: ObjectId - account referenceconnected_apps: 'billing' (identifies billing Stripe connection)token: Object - Stripe credentialsaccess_token: Stripe API keystripe_user_id: Stripe Connect account ID
_accountsโ
- Operations: Find
- Model:
shared/models/account.js - Usage Context: Account details for import context
contactsโ
- Operations: Find, Create, Update
- Model:
shared/models/contact.js - Usage Context: Store imported contacts
Key Fields:
email: String - unique identifier for duplicate detectiontype: 'person' | 'business'parent_account: ObjectId - account referenceowner: ObjectId - contact ownercreated_by: ObjectId - user who created contactsource: 'stripe' (identifies import source)stripe_customer_id: String - Stripe customer IDaddress: Object - contact addressshipping: Object - shipping address- Custom mapped fields (dynamic based on user mappings)
contact_import_resultsโ
- Operations: Create
- Model:
shared/models/contact-import-results.js - Usage Context: Track import statistics and errors
Record Structure:
{
status: 'in_progress',
name: 'stripe',
type: 'person' | 'business',
records_added: Number,
records_updated: Number,
import_errors_count: Number,
user_id: ObjectId,
account_id: String,
importerrors: Array // [{message, email}, ...]
}
๐ง Job Configurationโ
Cron Scheduleโ
'*/30 * * * * *'; // Every 30 seconds
Frequency: 120 times per hour (twice per minute)
Rationale: Fast import processing for immediate user feedback; 30-second interval balances responsiveness with system load.
Queue Settingsโ
QueueWrapper(`billing_contact_${name}`, 'global', {
processCb,
completedCb,
});
Queue Name: billing_contact_{importId} (unique per import request)
Concurrency: Default (1 per queue)
Job Options:
{
attempts: 3,
backoff: 4000 // 4-second fixed delay between retries
}
Retry Strategy: 3 attempts with 4-second delays (12 seconds total retry window)
๐ Processing Logic - Detailed Flowโ
1. Import Request Queryโ
Service Query:
await Queue.find({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
}).sort({ createdAt: -1 });
Matches: Pending billing contact imports not currently processing.
Sort Order: Newest first (LIFO) - prioritizes recent user requests.
2. In-Progress Lockโ
const ids = importData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
Purpose: Prevents duplicate processing by multiple cron runs.
Note: If Stripe credentials missing, resets in_progress: false for retry.
3. Stripe Credentials Validationโ
const stripeData = await StripeKey.findOne({
account_id: importParams.accountId,
connected_apps: 'billing',
});
if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}
Validation: Account must have Stripe connected for billing app.
Failure Handling: Resets in-progress flag for retry on next cron run.
Issue: Sets in_progress=false for ALL imports if ONE is missing credentials (bug).
4. JWT Token Generationโ
importParams.authToken = jwt.sign(
{
type: 'access_token',
uid: importParams.user_id.toString(),
account_id: importParams.account_id.toString(),
parent_account: importParams.parent_account.toString(),
client_id: importParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
Purpose: Generate auth token for internal API calls (if needed for future features).
Scope: Full contacts and communications permissions.
Expiry: 1 hour (sufficient for import processing).
5. Stripe Customer Fetchingโ
for await (const customer of stripe.customers.list(
{ limit: 100 },
{ stripeAccount: stripe_user_id },
)) {
// Process customer
}
Pagination: Stripe SDK automatically handles pagination (async iterator).
Limit: 100 customers per page (Stripe API maximum).
Stripe Connect: Uses connected account ID for multi-tenant support.
6. Field Mapping Applicationโ
const customerData = mappings && Object.keys(mappings).length && getMapping({ mappings, customer });
dataToInsert = { ...dataToInsert, ...customerData };
Mapping Logic: Transforms Stripe customer fields to DashClicks contact fields.
Default Fields: Always includes stripe_customer_id and email.
Custom Fields: Applied based on user-defined mappings.
7. Duplicate Detectionโ
const isContactExist = await Contact.findOne({
email: customer?.email,
type: 'person',
parent_account: new mongoose.Types.ObjectId(account_id),
});
Uniqueness: Combination of email + type + account (allows same email for person and business).
Case Sensitivity: MongoDB default (case-sensitive email matching).
8. Create or Update Logicโ
Update Existing Contact:
if (isContactExist) {
if (mappings && Object.keys(mappings).length) {
const contactData = getMapping({ mappings, customer, contact: isContactExist });
await Contact.updateOne({ _id: isContactExist._id }, { ...contactData });
} else {
await Contact.updateOne({ _id: isContactExist._id }, { stripe_customer_id: customer.id });
}
totalRecordsUpdated += 1;
}
With Mappings: Apply full field mapping.
Without Mappings: Only update stripe_customer_id.
Create New Contact:
else if (newContact) {
dataToInsert.source = 'stripe';
dataToInsert.email = customer?.email;
await new Contact(dataToInsert).save();
totalRecordsAdded += 1;
}
Condition: Only create if newContact flag is true.
Source Tag: Marks contact as imported from Stripe.
9. Import Results Trackingโ
let contactImportLog = {
status: 'in_progress',
name: 'stripe',
type: 'person',
records_added: totalRecordsAdded,
records_updated: totalRecordsUpdated,
import_errors_count: totalErrors,
user_id: new mongoose.Types.ObjectId(owner),
account_id: account_id,
importerrors: invalidContacts || [],
};
await new ContactImportResults(contactImportLog).save();
Statistics: Tracks added, updated, and error counts.
Error Details: Stores error messages and affected contact emails.
Status: Marked as 'in_progress' (should be 'completed' - possible bug).
10. Cleanupโ
const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};
Action: Deletes import request from queue collection on completion.
Effect: Prevents reprocessing of completed imports.
๐จ Error Handlingโ
Common Error Scenariosโ
Missing Stripe Credentialsโ
Scenario: Account doesn't have Stripe connected for billing
Handling: Resets in_progress: false, retries on next cron run
Impact: Import delayed until credentials added
Issue: Resets ALL imports if ONE is missing credentials (affects unrelated imports)
Stripe API Errorsโ
Scenario: Rate limiting, invalid credentials, network issues
Handling: Job retries 3 times with 4-second delays
Impact: Import delayed up to 12 seconds, then fails if persistent
Note: No exponential backoff (fixed 4-second delay)
Invalid Stripe Customer Dataโ
Scenario: Missing email, malformed data
Handling: Error logged, customer skipped, import continues
Impact: Customer not imported, tracked in invalidContacts array
Statistics: Increments totalErrors counter
Duplicate Email (Race Condition)โ
Scenario: Two imports try to create same contact simultaneously
Handling: MongoDB duplicate key error, caught and logged
Impact: One import succeeds, other logs error (contact still imported)
Note: No unique index on email prevents this at DB level
Contact Creation Failureโ
Scenario: Validation error, required field missing
Handling: Error caught, logged, customer skipped
Impact: Customer not imported, tracked in error list
Import Results Save Failureโ
Scenario: Database error when saving import results
Handling: Error logged, import processing continues
Impact: Statistics lost, but contacts still imported
Note: Results save wrapped in separate try/catch (doesn't fail job)
Failed Job Callbackโ
Note: No failedCb defined - relies on default Bull error handling.
Completed Job Callbackโ
const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};
Action: Delete import request from queue.
๐ Monitoring & Loggingโ
Success Loggingโ
Cron Level:
- "Billing Contact Execution Started"
- "Billing Contact Execution Finished"
- "Import Started" (console.log after queuing)
Queue Level:
- No explicit success logging for individual customers
Error Loggingโ
Cron Level:
- Error in cron initialization
Service Level:
- Error in service function (console.log, not logger)
Queue Level:
- Error adding/updating contact (per customer)
- Error initializing queue
Performance Metricsโ
- Queue Lookup: less than 1 second (indexed query)
- Stripe API Call: 1-2 seconds per page (100 customers)
- Field Mapping: less than 100ms per customer
- Contact Lookup: less than 100ms per customer (indexed by email)
- Contact Create/Update: less than 100ms per customer
- Import Results Save: less than 1 second
- Total Job Time: 5-30 seconds (depends on customer count)
- 10 customers: ~5 seconds
- 100 customers: ~15 seconds
- 500+ customers: ~30+ seconds (multiple pages)
๐ Integration Pointsโ
Triggers This Jobโ
- Internal API: Creates import request in
queuescollection (source: 'billing-contacts') - User Action: Initiates Stripe contact import from dashboard
- Cron Schedule: Every 30 seconds (no external triggers)
External Dependenciesโ
- Stripe API: Customer list endpoint
- Stripe Connect: Connected account credentials
- JWT Secret: For token generation
Jobs That Depend On Thisโ
- Contact Sync Jobs: May rely on Stripe customer ID linkage
- Billing Jobs: Use imported contacts for invoicing
Related Featuresโ
- CRM Dashboard: Displays imported contacts
- Stripe Billing: Links contacts to Stripe customers
- Contact Management: Allows editing imported contacts
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Contact Creation: Creates new contacts in CRM (permanent unless deleted)
- โ ๏ธ Contact Updates: Overwrites existing contact data with Stripe values
- โ ๏ธ Database Writes: Multiple writes per customer (contact + results)
- โ ๏ธ Queue Deletion: Import request deleted after processing (cannot retry)
Performance Considerationsโ
- High Frequency: 30-second interval ensures fast import processing
- Pagination: Stripe SDK handles automatic pagination (100 per page)
- Parallel Processing: Multiple imports processed concurrently (separate queues)
- Database Load: High write volume for large customer lists
- API Rate Limits: Stripe rate limits may throttle large imports
- No Batch Insert: Creates contacts one-by-one (could be optimized with bulkWrite)
Business Logicโ
Why Every 30 Seconds?
- Fast turnaround for user-initiated imports
- Balances responsiveness with system load
- 120 checks per hour is reasonable for background job
Why Dynamic Queue Names?
- Allows multiple imports to run concurrently
- Isolates imports from different accounts/users
- Prevents queue name conflicts
Why Update Without Mappings?
- Even without custom mappings, linking Stripe customer ID is valuable
- Allows future syncing and reference lookups
- Preserves manual edits to other fields
Why Person vs Business Queues?
- Different contact types have different field requirements
- Allows type-specific logic in future
- Matches CRM contact type structure
Why Check Email + Type + Account?
- Same person can exist as both person and business contact
- Different accounts can have contacts with same email
- Type separation prevents accidental merges
Why newContact Flag?
- User control over whether to create new contacts or update only
- Prevents polluting CRM with unwanted contacts
- Allows "sync only" mode for existing contacts
Why Store Old Field Values?
- Audit trail for data changes
- Allows rollback if mapping configured incorrectly
- Helps debug mapping issues
Maintenance Notesโ
- Import Type Values: 'contacts' and 'companies' hardcoded (consider enum)
- Retry Strategy: Fixed 4-second backoff (consider exponential)
- No Batch Operations: One-by-one inserts/updates (performance issue for large imports)
- Console.log Usage: Service uses console.log instead of logger
- Missing Status Update: Import results saved with 'in_progress' status (should be 'completed')
- Credentials Reset Bug: Resets all imports if one is missing credentials
- No Failed Callback: Default Bull error handling only
- JWT Token Unused: Generated but not currently used (future feature)
- No Import Cancellation: Cannot cancel in-progress imports
Code Quality Issuesโ
Issue 1: Credentials Reset Bug
if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}
Issue: Resets in_progress for ALL imports if ONE is missing credentials.
Impact: Unrelated imports reset and reprocessed.
Suggestion: Reset only the specific import:
if (!stripeData) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
return; // Skip this import
}
Issue 2: Missing Status Update
let contactImportLog = {
status: 'in_progress', // Should be 'completed'
// ...
};
Issue: Import results marked as 'in_progress' even after completion.
Suggestion: Set status to 'completed':
status: 'completed',
Issue 3: Console.log Instead of Logger
console.log('Import Started');
// ...
console.log(error);
Suggestion: Use logger for consistency:
logger.log({ initiator: 'QM/billing/contacts', message: 'Import Started' });
logger.error({ initiator: 'QM/billing/contacts', error });
Issue 4: No Batch Operations
for await (const customer of stripe.customers.list(...)) {
// One-by-one insert/update
await new Contact(dataToInsert).save();
}
Issue: Creates contacts one-by-one (slow for large imports).
Suggestion: Use bulkWrite:
const operations = [];
for await (const customer of stripe.customers.list(...)) {
// Build operations array
operations.push({
updateOne: {
filter: { email: customer.email, type: 'person', parent_account: accountId },
update: { $set: dataToInsert },
upsert: true,
},
});
}
await Contact.bulkWrite(operations);
Issue 5: Statistics Outside Processor
let totalRecordsAdded = 0; // Closure variable
const processCb = async (job, done) => {
// ... modifies closure variables
totalRecordsAdded += 1;
};
Issue: Statistics tracked in closure (not reset between jobs if queue reused).
Suggestion: Track statistics in job data or local variables:
const processCb = async (job, done) => {
let totalRecordsAdded = 0; // Local to processor
// ...
};
Issue 6: Error Handling for Stripe Iteration
try {
for await (const customer of stripe.customers.list(...)) {
// All processing
}
} catch (err) {
invalidContacts.push(...);
totalErrors += 1;
}
Issue: Single try/catch for entire loop - first error stops all processing.
Suggestion: Wrap each customer in try/catch:
for await (const customer of stripe.customers.list(...)) {
try {
// Process customer
} catch (err) {
invalidContacts.push({...});
totalErrors += 1;
logger.error({...});
}
}
Issue 7: JWT Token Not Used
importParams.authToken = jwt.sign({...}, process.env.APP_SECRET, { expiresIn: '1h' });
Issue: Token generated but never used in current implementation.
Suggestion: Remove if not needed, or document intended future use.
Issue 8: No Import Limit
for await (const customer of stripe.customers.list({ limit: 100 }, ...)) {
// Processes ALL customers
}
Issue: No maximum customer count - large accounts may import thousands.
Suggestion: Add configurable limit or progress tracking:
const MAX_CUSTOMERS = parseInt(process.env.STRIPE_IMPORT_LIMIT || '1000');
let processed = 0;
for await (const customer of stripe.customers.list(...)) {
if (processed >= MAX_CUSTOMERS) break;
// Process customer
processed++;
}
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/billing/import
Create Import Requestโ
const Queue = require('./models/queues');
// Create person contact import request
await new Queue({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
type: 'contacts', // 'contacts' for person, 'companies' for business
account_id: accountId,
user_id: userId,
parent_account: parentAccountId,
client_id: clientId,
mappings: {
name: 'first_name', // Stripe field โ DashClicks field
email: 'email',
phone: 'phone',
'address-city': 'address.city', // Hyphen converted to dot
},
new_contact: true, // Create new contacts if not exist
}).save();
console.log('Import request created, waiting for cron run (up to 30 seconds)');
Verify Import Processingโ
const Contact = require('./models/contact');
const ContactImportResults = require('./models/contact-import-results');
// Check created contacts
const contacts = await Contact.find({
parent_account: accountId,
source: 'stripe',
});
console.log('Imported contacts:', contacts.length);
// Check import results
const results = await ContactImportResults.find({
account_id: accountId.toString(),
name: 'stripe',
});
console.log('Import results:', results);
console.log('- Added:', results[0].records_added);
console.log('- Updated:', results[0].records_updated);
console.log('- Errors:', results[0].import_errors_count);
console.log('- Error details:', results[0].importerrors);
Test Field Mappingโ
// Test address mapping
const mappings = {
'address-street': 'address.street',
'address-city': 'address.city',
'address-state': 'address.state_province',
};
const mockCustomer = {
id: 'cus_123',
email: 'test@example.com',
name: 'John Doe',
address: {
line1: '123 Main St',
city: 'New York',
state: 'NY',
},
};
const { getMapping } = require('./common/billing_add_mapping');
const result = getMapping({ mappings, customer: mockCustomer });
console.log('Mapped data:', result);
// Expected:
// {
// stripe_customer_id: 'cus_123',
// email: 'test@example.com',
// address: {
// street: '123 Main St',
// city: 'New York',
// state_province: 'NY'
// }
// }
Test Duplicate Detectionโ
// Create existing contact
const existingContact = await new Contact({
email: 'test@example.com',
type: 'person',
parent_account: accountId,
first_name: 'John',
}).save();
// Trigger import with same email
// ... create import request ...
// After import, verify contact updated (not duplicated)
const contacts = await Contact.find({
email: 'test@example.com',
type: 'person',
parent_account: accountId,
});
console.log('Contacts with email:', contacts.length); // Should be 1
console.log('Updated name:', contacts[0].first_name); // Should be updated from Stripe
Monitor Queue Processingโ
# Watch logs during import
tail -f logs/queue-manager.log | grep "billing"
# Expected outputs:
# [INFO] Billing Contact Execution Started
# [INFO] Import Started
# [INFO] Billing Contact Execution Finished
# [ERROR] Error while adding/updating contact (if errors)
Test Error Handlingโ
// Test with invalid Stripe credentials
await StripeKey.findOneAndUpdate(
{ account_id: accountId },
{ $set: { 'token.access_token': 'invalid_key' } },
);
// Trigger import
// ... create import request ...
// After cron run, verify:
// 1. Error logged
// 2. Job retried 3 times
// 3. Import results show errors
Job Type: Scheduled with Dynamic Queue Creation
Execution Frequency: Every 30 seconds
Average Duration: 5-30 seconds per import (depends on customer count)
Status: Active