Skip to main content

๐Ÿ“ฅ Billing Contact Import

๐Ÿ“– Overviewโ€‹

The Billing Contact Import job automatically syncs Stripe customers into the DashClicks CRM as contacts. It runs every 30 seconds, processes pending import requests from the queue, fetches all Stripe customers (up to 100 per import), and creates or updates DashClicks contacts based on configurable field mappings. The job supports both person and business contact types, tracks import statistics (added, updated, errors), and generates import result logs for user visibility.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/billing/import.js
  2. Service Processing: queue-manager/services/billing/contacts.js (importContacts)
  3. Queue Definition: queue-manager/queues/billing/contacts/person.js & business.js

Execution Pattern: Rapid polling (every 30 seconds) with dynamic queue creation per import request

Queue Name: billing_contact_{importId} (unique per import)

Environment Flag: QM_BILLING_IMPORT=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 30s)
participant SERVICE as Import Service
participant QUEUE_DB as Queues Collection
participant STRIPE_DB as StripeKey<br/>Collection
participant ACCOUNT_DB as Accounts DB
participant PERSON_Q as Person Queue
participant BUSINESS_Q as Business Queue
participant STRIPE as Stripe API
participant CONTACT_DB as Contacts<br/>Collection
participant RESULTS_DB as Import Results<br/>Collection

CRON->>SERVICE: importContacts()
SERVICE->>QUEUE_DB: Find pending imports:<br/>source='billing-contacts'<br/>status='pending'<br/>in_progress=false
QUEUE_DB-->>SERVICE: Import requests

SERVICE->>QUEUE_DB: Set in_progress=true<br/>for all requests

loop Each import request
SERVICE->>STRIPE_DB: Get Stripe credentials<br/>for account
SERVICE->>ACCOUNT_DB: Get account details
SERVICE->>SERVICE: Generate JWT token<br/>for API access

alt Import type = 'contacts' (person)
SERVICE->>PERSON_Q: Create queue & add job
else Import type = 'companies' (business)
SERVICE->>BUSINESS_Q: Create queue & add job
end
end

loop Queue Processing (Person/Business)
PERSON_Q->>STRIPE: List all customers<br/>limit=100, paginate

loop Each Stripe customer
STRIPE-->>PERSON_Q: Customer data
PERSON_Q->>PERSON_Q: Apply field mappings<br/>Stripe โ†’ DashClicks
PERSON_Q->>CONTACT_DB: Check if contact exists<br/>(email + type + account)

alt Contact exists
PERSON_Q->>CONTACT_DB: Update contact<br/>with mapped data
else Contact doesn't exist AND newContact=true
PERSON_Q->>CONTACT_DB: Create new contact<br/>source='stripe'
end
end

PERSON_Q->>RESULTS_DB: Save import results:<br/>records_added, records_updated,<br/>import_errors_count
PERSON_Q->>QUEUE_DB: Delete import request
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/billing/import.js

Purpose: Schedule billing contact import check every 30 seconds

Cron Pattern: */30 * * * * * (every 30 seconds)

Initialization:

const { importContacts } = require('../../services/billing/contacts');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
logger.log({
initiator: 'QM/billing/import',
message: 'Billing Contact Execution Started',
});
inProgress = true;
await importContacts();
inProgress = false;
logger.log({
initiator: 'QM/billing/import',
message: 'Billing Contact Execution Finished',
});
}
});
} catch (err) {
logger.error({ initiator: 'QM/billing/import', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

Execution Times: Every 30 seconds (twice per minute)

Logging: Start and finish logs for visibility.

2. Service Processingโ€‹

File: queue-manager/services/billing/contacts.js (importContacts export)

Purpose: Find pending import requests, validate Stripe credentials, generate auth tokens, and queue imports

Key Features:

  • Query pending imports from queues collection
  • In-progress flag to prevent duplicate processing
  • JWT token generation for API access
  • Dynamic queue creation based on import type (person vs business)

Main Service Function:

const jwt = require('jsonwebtoken');
const Account = require('../../models/account');
const Queue = require('../../models/queues');
const personQueue = require('../../queues/billing/contacts/person');
const businessQueue = require('../../queues/billing/contacts/business');
const StripeKey = require('../../models/stripe-key');

exports.importContacts = async () => {
try {
let importData = await Queue.find({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
})
.sort({ createdAt: -1 })
.lean()
.exec();
const ids = importData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
if (importData.length) {
await Promise.all(
importData.map(async importParams => {
importParams.accountId = importParams.account_id.toString();
const stripeData = await StripeKey.findOne({
account_id: importParams.accountId,
connected_apps: 'billing',
})
.lean()
.exec();
if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}
const stripeToken = stripeData.token;
let type = importParams.type;
let acc = await Account.findById(importParams.accountId);
importParams.ownerId = importParams.user_id.toString();
importParams.authToken = jwt.sign(
{
type: 'access_token',
uid: importParams.user_id.toString(),
account_id: importParams.account_id.toString(),
parent_account: importParams.parent_account.toString(),
client_id: importParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

let accessToken = importParams.authToken;
let accountId = importParams.accountId;
let ownerId = importParams.ownerId;
let mappings = importParams.mappings;
let newContact = importParams.new_contact;

if (type === 'contacts') {
let Q = await personQueue.start(importParams._id.toString());
await Q.add(
{
id: importParams._id.toString(),
account_id: accountId,
owner: ownerId,
importParams,
mappings,
newContact,
accessToken,
stripeToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
if (type === 'companies') {
let Q = await businessQueue.start(importParams._id.toString());
await Q.add(
{
id: importParams._id.toString(),
account_id: accountId,
owner: ownerId,
importParams,
mappings,
newContact,
accessToken,
stripeToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
}),
);
console.log('Import Started');
} else {
}
} catch (error) {
console.log(error);
}
};

Query: Matches imports with:

  • status: 'pending'
  • source: 'billing-contacts'
  • in_progress: false

Import Types:

  • contacts - Person contacts (individuals)
  • companies - Business contacts (organizations)

JWT Token Scope: Includes contacts, communications, files permissions (1-hour expiry)

3. Queue Processing - Person Contactsโ€‹

File: queue-manager/queues/billing/contacts/person.js

Purpose: Import Stripe customers as person contacts with field mapping and duplicate detection

Key Functions:

  • Fetch all Stripe customers (paginated, 100 per request)
  • Apply custom field mappings (Stripe โ†’ DashClicks)
  • Check for existing contacts by email
  • Create new contacts or update existing ones
  • Track statistics (added, updated, errors)
  • Save import results log

Complete Processor:

const Stripe = require('stripe');
const mongoose = require('mongoose');
const ContactImportResults = require('../../../models/contact-import-results');
const Contact = require('../../../models/contact');
const Queue = require('../../../models/queues');
const QueueWrapper = require('../../../common/queue-wrapper');
const { getMapping } = require('../../../common/billing_add_mapping');
const logger = require('../../../utilities/logger');

exports.start = async name => {
let totalRecordsAdded = 0;
let totalRecordsUpdated = 0;
let totalErrors = 0;
let invalidContacts = [];
let contact;
try {
const processCb = async (job, done) => {
try {
const { account_id, owner, mappings, newContact, stripeToken, account } = job.data;
const accountId = account_id;
const stripe = Stripe(stripeToken.access_token);
const stripe_user_id = stripeToken.stripe_user_id;
try {
let dataToInsert = {
parent_account: new mongoose.Types.ObjectId(accountId),
owner,
created_by: owner,
type: 'person',
};
for await (const customer of stripe.customers.list(
{ limit: 100 },
{ stripeAccount: stripe_user_id },
)) {
const customerData =
mappings && Object.keys(mappings).length && getMapping({ mappings, customer });
dataToInsert = { ...dataToInsert, ...customerData };
contact = customer;
dataToInsert.stripe_customer_id = customer.id;
const isContactExist = await Contact.findOne({
email: customer?.email,
type: 'person',
parent_account: new mongoose.Types.ObjectId(account_id),
});
if (isContactExist) {
if (mappings && Object.keys(mappings).length) {
const contactData = getMapping({
mappings,
customer,
contact: isContactExist,
});
await Contact.updateOne({ _id: isContactExist._id }, { ...contactData });
} else {
await Contact.updateOne(
{ _id: isContactExist._id },
{ stripe_customer_id: customer.id },
);
}
totalRecordsUpdated += 1;
} else if (newContact) {
dataToInsert.source = 'stripe';
dataToInsert.email = customer?.email;
await new Contact(dataToInsert).save();
totalRecordsAdded += 1;
}
}
} catch (err) {
invalidContacts.push(
err.additional_data || {
message: err.toString(),
email: contact?.email,
},
);
totalErrors += 1;
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
message: `Error while adding/updating contact`,
});
}
try {
let contactImportLog = {
status: 'in_progress',
name: 'stripe',
type: 'person',
records_added: totalRecordsAdded,
records_updated: totalRecordsUpdated,
import_errors_count: totalErrors,
user_id: new mongoose.Types.ObjectId(owner),
account_id: account_id,
importerrors: invalidContacts || [],
};
await new ContactImportResults(contactImportLog).save();
} catch (err) {
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
});
}
return done();
} catch (err) {
done(err);
}
};

const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};

const queue = QueueWrapper(`billing_contact_${name}`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/billing/contacts/person',
error: err,
message: `Error while initializing contact`,
});
}
};

4. Queue Processing - Business Contactsโ€‹

File: queue-manager/queues/billing/contacts/business.js

Purpose: Import Stripe customers as business contacts (identical logic to person, different type)

Key Difference: Sets type: 'business' instead of type: 'person'

All Other Logic: Identical to person queue processor (same field mapping, duplicate detection, etc.)

5. Field Mapping Utilityโ€‹

File: queue-manager/common/billing_add_mapping.js (getMapping)

Purpose: Transform Stripe customer fields to DashClicks contact fields based on user-defined mappings

Key Functions:

  • getValueBydotKey - Extract nested values using dot notation (e.g., 'address.city')
  • mapCustomerData - Map Stripe fields to DashClicks fields
  • mapContactData - Preserve existing contact values for unmapped fields
  • getMapping - Main mapping orchestrator

Mapping Logic:

const getMapping = ({ mappings, customer, contact }) => {
let updObj = {
stripe_customer_id: customer.id,
email: customer.email,
}
let customerData = {};
let contactData = {};
let customerAddress = {};
let customerShipping = {};
// ... address and shipping mapping logic ...

Object.entries(mappings).forEach(([k, v]) => {
k = k.replace(/\-/g, '.'); // Convert hyphens to dots
const value = getValueBydotKey(k, customer); // Get Stripe value
const oldValue = contact && getValueBydotKey(v, contact?._doc); // Get existing contact value
// Map Stripe value or preserve old value
mapCustomerData({...});
contact && mapContactData({...});
})

updObj = { ...updObj, ...customerData };
if (contact) {
updObj.old_field_values = { email: contact?._doc?.email, ...contactData };
}
return updObj;
}

Special Field Handling:

  • Address fields: address.street, address.city, address.state_province, etc.
  • Shipping fields: shipping.address.street, shipping.address.city, etc.
  • Standard fields: email, name, phone, custom fields, etc.

Old Field Values: Preserved in old_field_values for audit/rollback purposes.

๐Ÿ—„๏ธ Collections Usedโ€‹

queuesโ€‹

  • Operations: Find, Update, Delete
  • Model: shared/models/queues.js
  • Usage Context: Store pending import requests

Key Fields:

  • status: 'pending' | 'completed' | 'failed'
  • source: 'billing-contacts' (identifies billing imports)
  • in_progress: Boolean - prevents duplicate processing
  • type: 'contacts' | 'companies' (person vs business)
  • account_id: ObjectId - target account
  • user_id: ObjectId - user who initiated import
  • parent_account: ObjectId - parent account reference
  • client_id: ObjectId - OAuth client reference
  • mappings: Object - field mapping configuration
  • new_contact: Boolean - whether to create new contacts or update only

stripe_keyโ€‹

  • Operations: Find
  • Model: shared/models/stripe-key.js
  • Usage Context: Retrieve Stripe API credentials

Key Fields:

  • account_id: ObjectId - account reference
  • connected_apps: 'billing' (identifies billing Stripe connection)
  • token: Object - Stripe credentials
    • access_token: Stripe API key
    • stripe_user_id: Stripe Connect account ID

_accountsโ€‹

  • Operations: Find
  • Model: shared/models/account.js
  • Usage Context: Account details for import context

contactsโ€‹

  • Operations: Find, Create, Update
  • Model: shared/models/contact.js
  • Usage Context: Store imported contacts

Key Fields:

  • email: String - unique identifier for duplicate detection
  • type: 'person' | 'business'
  • parent_account: ObjectId - account reference
  • owner: ObjectId - contact owner
  • created_by: ObjectId - user who created contact
  • source: 'stripe' (identifies import source)
  • stripe_customer_id: String - Stripe customer ID
  • address: Object - contact address
  • shipping: Object - shipping address
  • Custom mapped fields (dynamic based on user mappings)

contact_import_resultsโ€‹

  • Operations: Create
  • Model: shared/models/contact-import-results.js
  • Usage Context: Track import statistics and errors

Record Structure:

{
status: 'in_progress',
name: 'stripe',
type: 'person' | 'business',
records_added: Number,
records_updated: Number,
import_errors_count: Number,
user_id: ObjectId,
account_id: String,
importerrors: Array // [{message, email}, ...]
}

๐Ÿ”ง Job Configurationโ€‹

Cron Scheduleโ€‹

'*/30 * * * * *'; // Every 30 seconds

Frequency: 120 times per hour (twice per minute)

Rationale: Fast import processing for immediate user feedback; 30-second interval balances responsiveness with system load.

Queue Settingsโ€‹

QueueWrapper(`billing_contact_${name}`, 'global', {
processCb,
completedCb,
});

Queue Name: billing_contact_{importId} (unique per import request)

Concurrency: Default (1 per queue)

Job Options:

{
attempts: 3,
backoff: 4000 // 4-second fixed delay between retries
}

Retry Strategy: 3 attempts with 4-second delays (12 seconds total retry window)

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

1. Import Request Queryโ€‹

Service Query:

await Queue.find({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
}).sort({ createdAt: -1 });

Matches: Pending billing contact imports not currently processing.

Sort Order: Newest first (LIFO) - prioritizes recent user requests.

2. In-Progress Lockโ€‹

const ids = importData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

Purpose: Prevents duplicate processing by multiple cron runs.

Note: If Stripe credentials missing, resets in_progress: false for retry.

3. Stripe Credentials Validationโ€‹

const stripeData = await StripeKey.findOne({
account_id: importParams.accountId,
connected_apps: 'billing',
});
if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}

Validation: Account must have Stripe connected for billing app.

Failure Handling: Resets in-progress flag for retry on next cron run.

Issue: Sets in_progress=false for ALL imports if ONE is missing credentials (bug).

4. JWT Token Generationโ€‹

importParams.authToken = jwt.sign(
{
type: 'access_token',
uid: importParams.user_id.toString(),
account_id: importParams.account_id.toString(),
parent_account: importParams.parent_account.toString(),
client_id: importParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

Purpose: Generate auth token for internal API calls (if needed for future features).

Scope: Full contacts and communications permissions.

Expiry: 1 hour (sufficient for import processing).

5. Stripe Customer Fetchingโ€‹

for await (const customer of stripe.customers.list(
{ limit: 100 },
{ stripeAccount: stripe_user_id },
)) {
// Process customer
}

Pagination: Stripe SDK automatically handles pagination (async iterator).

Limit: 100 customers per page (Stripe API maximum).

Stripe Connect: Uses connected account ID for multi-tenant support.

6. Field Mapping Applicationโ€‹

const customerData = mappings && Object.keys(mappings).length && getMapping({ mappings, customer });
dataToInsert = { ...dataToInsert, ...customerData };

Mapping Logic: Transforms Stripe customer fields to DashClicks contact fields.

Default Fields: Always includes stripe_customer_id and email.

Custom Fields: Applied based on user-defined mappings.

7. Duplicate Detectionโ€‹

const isContactExist = await Contact.findOne({
email: customer?.email,
type: 'person',
parent_account: new mongoose.Types.ObjectId(account_id),
});

Uniqueness: Combination of email + type + account (allows same email for person and business).

Case Sensitivity: MongoDB default (case-sensitive email matching).

8. Create or Update Logicโ€‹

Update Existing Contact:

if (isContactExist) {
if (mappings && Object.keys(mappings).length) {
const contactData = getMapping({ mappings, customer, contact: isContactExist });
await Contact.updateOne({ _id: isContactExist._id }, { ...contactData });
} else {
await Contact.updateOne({ _id: isContactExist._id }, { stripe_customer_id: customer.id });
}
totalRecordsUpdated += 1;
}

With Mappings: Apply full field mapping.

Without Mappings: Only update stripe_customer_id.

Create New Contact:

else if (newContact) {
dataToInsert.source = 'stripe';
dataToInsert.email = customer?.email;
await new Contact(dataToInsert).save();
totalRecordsAdded += 1;
}

Condition: Only create if newContact flag is true.

Source Tag: Marks contact as imported from Stripe.

9. Import Results Trackingโ€‹

let contactImportLog = {
status: 'in_progress',
name: 'stripe',
type: 'person',
records_added: totalRecordsAdded,
records_updated: totalRecordsUpdated,
import_errors_count: totalErrors,
user_id: new mongoose.Types.ObjectId(owner),
account_id: account_id,
importerrors: invalidContacts || [],
};
await new ContactImportResults(contactImportLog).save();

Statistics: Tracks added, updated, and error counts.

Error Details: Stores error messages and affected contact emails.

Status: Marked as 'in_progress' (should be 'completed' - possible bug).

10. Cleanupโ€‹

const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};

Action: Deletes import request from queue collection on completion.

Effect: Prevents reprocessing of completed imports.

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Missing Stripe Credentialsโ€‹

Scenario: Account doesn't have Stripe connected for billing

Handling: Resets in_progress: false, retries on next cron run

Impact: Import delayed until credentials added

Issue: Resets ALL imports if ONE is missing credentials (affects unrelated imports)

Stripe API Errorsโ€‹

Scenario: Rate limiting, invalid credentials, network issues

Handling: Job retries 3 times with 4-second delays

Impact: Import delayed up to 12 seconds, then fails if persistent

Note: No exponential backoff (fixed 4-second delay)

Invalid Stripe Customer Dataโ€‹

Scenario: Missing email, malformed data

Handling: Error logged, customer skipped, import continues

Impact: Customer not imported, tracked in invalidContacts array

Statistics: Increments totalErrors counter

Duplicate Email (Race Condition)โ€‹

Scenario: Two imports try to create same contact simultaneously

Handling: MongoDB duplicate key error, caught and logged

Impact: One import succeeds, other logs error (contact still imported)

Note: No unique index on email prevents this at DB level

Contact Creation Failureโ€‹

Scenario: Validation error, required field missing

Handling: Error caught, logged, customer skipped

Impact: Customer not imported, tracked in error list

Import Results Save Failureโ€‹

Scenario: Database error when saving import results

Handling: Error logged, import processing continues

Impact: Statistics lost, but contacts still imported

Note: Results save wrapped in separate try/catch (doesn't fail job)

Failed Job Callbackโ€‹

Note: No failedCb defined - relies on default Bull error handling.

Completed Job Callbackโ€‹

const completedCb = async job => {
await Queue.deleteOne({ _id: job.data.id });
};

Action: Delete import request from queue.

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

Cron Level:

  • "Billing Contact Execution Started"
  • "Billing Contact Execution Finished"
  • "Import Started" (console.log after queuing)

Queue Level:

  • No explicit success logging for individual customers

Error Loggingโ€‹

Cron Level:

  • Error in cron initialization

Service Level:

  • Error in service function (console.log, not logger)

Queue Level:

  • Error adding/updating contact (per customer)
  • Error initializing queue

Performance Metricsโ€‹

  • Queue Lookup: less than 1 second (indexed query)
  • Stripe API Call: 1-2 seconds per page (100 customers)
  • Field Mapping: less than 100ms per customer
  • Contact Lookup: less than 100ms per customer (indexed by email)
  • Contact Create/Update: less than 100ms per customer
  • Import Results Save: less than 1 second
  • Total Job Time: 5-30 seconds (depends on customer count)
    • 10 customers: ~5 seconds
    • 100 customers: ~15 seconds
    • 500+ customers: ~30+ seconds (multiple pages)

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Internal API: Creates import request in queues collection (source: 'billing-contacts')
  • User Action: Initiates Stripe contact import from dashboard
  • Cron Schedule: Every 30 seconds (no external triggers)

External Dependenciesโ€‹

  • Stripe API: Customer list endpoint
  • Stripe Connect: Connected account credentials
  • JWT Secret: For token generation

Jobs That Depend On Thisโ€‹

  • Contact Sync Jobs: May rely on Stripe customer ID linkage
  • Billing Jobs: Use imported contacts for invoicing
  • CRM Dashboard: Displays imported contacts
  • Stripe Billing: Links contacts to Stripe customers
  • Contact Management: Allows editing imported contacts

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Contact Creation: Creates new contacts in CRM (permanent unless deleted)
  • โš ๏ธ Contact Updates: Overwrites existing contact data with Stripe values
  • โš ๏ธ Database Writes: Multiple writes per customer (contact + results)
  • โš ๏ธ Queue Deletion: Import request deleted after processing (cannot retry)

Performance Considerationsโ€‹

  • High Frequency: 30-second interval ensures fast import processing
  • Pagination: Stripe SDK handles automatic pagination (100 per page)
  • Parallel Processing: Multiple imports processed concurrently (separate queues)
  • Database Load: High write volume for large customer lists
  • API Rate Limits: Stripe rate limits may throttle large imports
  • No Batch Insert: Creates contacts one-by-one (could be optimized with bulkWrite)

Business Logicโ€‹

Why Every 30 Seconds?

  • Fast turnaround for user-initiated imports
  • Balances responsiveness with system load
  • 120 checks per hour is reasonable for background job

Why Dynamic Queue Names?

  • Allows multiple imports to run concurrently
  • Isolates imports from different accounts/users
  • Prevents queue name conflicts

Why Update Without Mappings?

  • Even without custom mappings, linking Stripe customer ID is valuable
  • Allows future syncing and reference lookups
  • Preserves manual edits to other fields

Why Person vs Business Queues?

  • Different contact types have different field requirements
  • Allows type-specific logic in future
  • Matches CRM contact type structure

Why Check Email + Type + Account?

  • Same person can exist as both person and business contact
  • Different accounts can have contacts with same email
  • Type separation prevents accidental merges

Why newContact Flag?

  • User control over whether to create new contacts or update only
  • Prevents polluting CRM with unwanted contacts
  • Allows "sync only" mode for existing contacts

Why Store Old Field Values?

  • Audit trail for data changes
  • Allows rollback if mapping configured incorrectly
  • Helps debug mapping issues

Maintenance Notesโ€‹

  • Import Type Values: 'contacts' and 'companies' hardcoded (consider enum)
  • Retry Strategy: Fixed 4-second backoff (consider exponential)
  • No Batch Operations: One-by-one inserts/updates (performance issue for large imports)
  • Console.log Usage: Service uses console.log instead of logger
  • Missing Status Update: Import results saved with 'in_progress' status (should be 'completed')
  • Credentials Reset Bug: Resets all imports if one is missing credentials
  • No Failed Callback: Default Bull error handling only
  • JWT Token Unused: Generated but not currently used (future feature)
  • No Import Cancellation: Cannot cancel in-progress imports

Code Quality Issuesโ€‹

Issue 1: Credentials Reset Bug

if (!stripeData) {
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: false });
}

Issue: Resets in_progress for ALL imports if ONE is missing credentials.

Impact: Unrelated imports reset and reprocessed.

Suggestion: Reset only the specific import:

if (!stripeData) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
return; // Skip this import
}

Issue 2: Missing Status Update

let contactImportLog = {
status: 'in_progress', // Should be 'completed'
// ...
};

Issue: Import results marked as 'in_progress' even after completion.

Suggestion: Set status to 'completed':

status: 'completed',

Issue 3: Console.log Instead of Logger

console.log('Import Started');
// ...
console.log(error);

Suggestion: Use logger for consistency:

logger.log({ initiator: 'QM/billing/contacts', message: 'Import Started' });
logger.error({ initiator: 'QM/billing/contacts', error });

Issue 4: No Batch Operations

for await (const customer of stripe.customers.list(...)) {
// One-by-one insert/update
await new Contact(dataToInsert).save();
}

Issue: Creates contacts one-by-one (slow for large imports).

Suggestion: Use bulkWrite:

const operations = [];
for await (const customer of stripe.customers.list(...)) {
// Build operations array
operations.push({
updateOne: {
filter: { email: customer.email, type: 'person', parent_account: accountId },
update: { $set: dataToInsert },
upsert: true,
},
});
}
await Contact.bulkWrite(operations);

Issue 5: Statistics Outside Processor

let totalRecordsAdded = 0; // Closure variable
const processCb = async (job, done) => {
// ... modifies closure variables
totalRecordsAdded += 1;
};

Issue: Statistics tracked in closure (not reset between jobs if queue reused).

Suggestion: Track statistics in job data or local variables:

const processCb = async (job, done) => {
let totalRecordsAdded = 0; // Local to processor
// ...
};

Issue 6: Error Handling for Stripe Iteration

try {
for await (const customer of stripe.customers.list(...)) {
// All processing
}
} catch (err) {
invalidContacts.push(...);
totalErrors += 1;
}

Issue: Single try/catch for entire loop - first error stops all processing.

Suggestion: Wrap each customer in try/catch:

for await (const customer of stripe.customers.list(...)) {
try {
// Process customer
} catch (err) {
invalidContacts.push({...});
totalErrors += 1;
logger.error({...});
}
}

Issue 7: JWT Token Not Used

importParams.authToken = jwt.sign({...}, process.env.APP_SECRET, { expiresIn: '1h' });

Issue: Token generated but never used in current implementation.

Suggestion: Remove if not needed, or document intended future use.

Issue 8: No Import Limit

for await (const customer of stripe.customers.list({ limit: 100 }, ...)) {
// Processes ALL customers
}

Issue: No maximum customer count - large accounts may import thousands.

Suggestion: Add configurable limit or progress tracking:

const MAX_CUSTOMERS = parseInt(process.env.STRIPE_IMPORT_LIMIT || '1000');
let processed = 0;
for await (const customer of stripe.customers.list(...)) {
if (processed >= MAX_CUSTOMERS) break;
// Process customer
processed++;
}

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/billing/import

Create Import Requestโ€‹

const Queue = require('./models/queues');

// Create person contact import request
await new Queue({
status: 'pending',
source: 'billing-contacts',
in_progress: false,
type: 'contacts', // 'contacts' for person, 'companies' for business
account_id: accountId,
user_id: userId,
parent_account: parentAccountId,
client_id: clientId,
mappings: {
name: 'first_name', // Stripe field โ†’ DashClicks field
email: 'email',
phone: 'phone',
'address-city': 'address.city', // Hyphen converted to dot
},
new_contact: true, // Create new contacts if not exist
}).save();

console.log('Import request created, waiting for cron run (up to 30 seconds)');

Verify Import Processingโ€‹

const Contact = require('./models/contact');
const ContactImportResults = require('./models/contact-import-results');

// Check created contacts
const contacts = await Contact.find({
parent_account: accountId,
source: 'stripe',
});
console.log('Imported contacts:', contacts.length);

// Check import results
const results = await ContactImportResults.find({
account_id: accountId.toString(),
name: 'stripe',
});
console.log('Import results:', results);
console.log('- Added:', results[0].records_added);
console.log('- Updated:', results[0].records_updated);
console.log('- Errors:', results[0].import_errors_count);
console.log('- Error details:', results[0].importerrors);

Test Field Mappingโ€‹

// Test address mapping
const mappings = {
'address-street': 'address.street',
'address-city': 'address.city',
'address-state': 'address.state_province',
};

const mockCustomer = {
id: 'cus_123',
email: 'test@example.com',
name: 'John Doe',
address: {
line1: '123 Main St',
city: 'New York',
state: 'NY',
},
};

const { getMapping } = require('./common/billing_add_mapping');
const result = getMapping({ mappings, customer: mockCustomer });
console.log('Mapped data:', result);
// Expected:
// {
// stripe_customer_id: 'cus_123',
// email: 'test@example.com',
// address: {
// street: '123 Main St',
// city: 'New York',
// state_province: 'NY'
// }
// }

Test Duplicate Detectionโ€‹

// Create existing contact
const existingContact = await new Contact({
email: 'test@example.com',
type: 'person',
parent_account: accountId,
first_name: 'John',
}).save();

// Trigger import with same email
// ... create import request ...

// After import, verify contact updated (not duplicated)
const contacts = await Contact.find({
email: 'test@example.com',
type: 'person',
parent_account: accountId,
});
console.log('Contacts with email:', contacts.length); // Should be 1
console.log('Updated name:', contacts[0].first_name); // Should be updated from Stripe

Monitor Queue Processingโ€‹

# Watch logs during import
tail -f logs/queue-manager.log | grep "billing"

# Expected outputs:
# [INFO] Billing Contact Execution Started
# [INFO] Import Started
# [INFO] Billing Contact Execution Finished
# [ERROR] Error while adding/updating contact (if errors)

Test Error Handlingโ€‹

// Test with invalid Stripe credentials
await StripeKey.findOneAndUpdate(
{ account_id: accountId },
{ $set: { 'token.access_token': 'invalid_key' } },
);

// Trigger import
// ... create import request ...

// After cron run, verify:
// 1. Error logged
// 2. Job retried 3 times
// 3. Import results show errors

Job Type: Scheduled with Dynamic Queue Creation
Execution Frequency: Every 30 seconds
Average Duration: 5-30 seconds per import (depends on customer count)
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM