Skip to main content

๐Ÿ“ฅ Account Import Processing

๐Ÿ“– Overviewโ€‹

The Account Import job processes CSV files for bulk account creation with business contacts. It reads CSV data from Wasabi S3 storage, maps fields to account/business structure, validates data, creates business contacts first, then creates sub-accounts, and logs all results. The system supports custom field mappings and includes comprehensive error handling for invalid entries.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/accounts/import.js
  2. Service Processing: queue-manager/services/accounts/import.js
  3. Queue Definition: queue-manager/queues/accounts/import.js
  4. Utilities: utilities/account-invite.js, common/add_data.js, utilities/crm-contact.js

Execution Pattern: Cron-based (every 5 seconds)

Queue Name: accounts_import_queue_${queueId} (dynamic per import job)

Environment Flag: QM_ACCOUNTS_IMPORT=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Import Service
participant DB as Queue Collection
participant WASABI as Wasabi S3
participant CSV as CSV Parser
participant QUEUE as Bull Queue
participant CRM as CRM Contact<br/>Utility
participant INVITE as Account Invite<br/>Utility
participant LOG as CSV Import Log

CRON->>SERVICE: Check for pending imports
SERVICE->>DB: Query pending account imports
DB-->>SERVICE: Return import jobs

SERVICE->>DB: Mark in_progress=true

loop For each import job
SERVICE->>WASABI: Fetch CSV file by key
WASABI-->>SERVICE: Return file stream
SERVICE->>CSV: Parse CSV to JSON
CSV-->>SERVICE: Return entries array

SERVICE->>SERVICE: Generate JWT token<br/>(1-hour expiration)
SERVICE->>QUEUE: Create dynamic queue

loop For each CSV row
SERVICE->>SERVICE: Map CSV fields<br/>(account + business)
SERVICE->>QUEUE: Add account job<br/>with mappings

QUEUE->>CRM: Create business contact
alt Business created
CRM-->>QUEUE: Return business_id
QUEUE->>INVITE: Create sub-account<br/>linked to business
alt Account created
INVITE-->>QUEUE: Return account data
QUEUE->>LOG: Log success
else Account failed
INVITE-->>QUEUE: Return error
QUEUE->>LOG: Log error
end
else Business failed
CRM-->>QUEUE: Return validation error
QUEUE->>LOG: Log error
end
end

QUEUE->>SERVICE: All jobs complete
SERVICE->>DB: Update queue status<br/>Mark complete/failed
SERVICE->>LOG: Generate final CSV log
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/accounts/import.js

Purpose: Schedule account import checks every 5 seconds

Cron Pattern: */5 * * * * * (every 5 seconds)

Initialization:

const { importAccounts } = require('../../services/accounts/import');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importAccounts();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/accounts/import', error: err });
}
};

In-Progress Lock: Prevents overlapping executions during large CSV processing.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/accounts/import.js

Purpose: Fetch pending imports from queue, download CSV from Wasabi, parse and map data, create Bull queue

Key Functions:

  • Query pending account import jobs
  • Download CSV from Wasabi S3
  • Parse CSV to JSON with csvtojson
  • Generate JWT tokens for API authentication
  • Map CSV fields to account/business structure
  • Create dynamic Bull queue per import job
  • Add individual account jobs to queue

Main Processing Function:

exports.importAccounts = async () => {
try {
// Query pending import jobs
const importData = await Queue.find({
source: 'accounts',
status: 'pending',
in_progress: false,
}).sort({ createdAt: -1 });

if (importData.length) {
// Mark jobs as in-progress
const ids = importData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

// Process each import job
await Promise.all(
importData.map(async importParams => {
try {
const mappings = importParams.mappings;
const user_id = importParams.user_id;
const account_id = importParams.account_id;
const parent_account = importParams.parent_account;
const client_id = importParams.client_id;

// Get CSV key from Wasabi
let key = '';
if (importParams.csv[0].key) {
key = importParams.csv[0].key;
} else {
throw new Error(`Key is missing in csv details. Queue id: ${importParams._id}`);
}

// Download CSV from Wasabi S3
const file = await new wasabiUtil().getReadStream(key);

// Parse CSV to JSON
const entries = await csv().fromStream(file);

// Generate JWT token (1-hour expiration)
token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
parent_account: parent_account.toString(),
client_id: client_id.toString(),
scope:
'account contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

const user = {
created_by: user_id,
owner: user_id,
account_id: account_id,
token,
};

// Create dynamic queue for this import
let queue = await accountQueue.start(importParams._id.toString());

// Add CSV rows to queue
await addData(entries, mappings, user, importParams, queue);
} catch (err) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
}),
);

console.log('Accounts added to queue for import.');
}
} catch (err) {
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
};

Data Processing & Mapping:

const addData = async (entries, mappings, user, importParams, queue) => {
// Process each CSV row
for (let entry in entries) {
let accountData = entries[entry];
let accountInfo = {};
let businessInfo = {};
const account = {};
const business = {};
const address = {};
const social = {};

// Map CSV columns to account/business fields
for (let mapping in mappings) {
let currentMapping = '';
const codeRS = mapping.split('.');

// Navigate nested CSV structure
for (let param of codeRS) {
currentMapping = myFun(param, accountData, currentMapping);
}

// Add to account or business object
const { accountInfo: AccountInfo, businessInfo: BusinessInfo } = addAccountData(
mappings,
currentMapping,
mapping,
account,
business,
address,
social,
accountInfo,
businessInfo,
);

accountInfo = AccountInfo;
businessInfo = BusinessInfo;
}

// Add to Bull queue
if (businessInfo) {
await queue.add(
{
user,
accountInfo,
businessInfo,
importParams,
csvAccount: accountData,
lineNumber: entry,
},
{
attempts: 4,
backoff: 4000,
},
);
}
}
};

// Helper to navigate nested CSV data
const myFun = (key, data, val) => {
if (!val) val = data[key];
else val = val[key];
return val;
};

3. Queue Definitionโ€‹

File: queue-manager/queues/accounts/import.js

Purpose: Process individual account creation jobs

Queue Processor:

exports.start = name => {
try {
let totalRecordsAdded = 0;
let totalErrors = 0;
let logID = undefined;
let accountId = undefined;
let ownerId = undefined;
let importParams = undefined;
let accessToken = undefined;
let resCSVImport = undefined;
let logs = [];

const processCb = async (job, done) => {
try {
const {
user,
accountInfo,
businessInfo,
importParams: params,
csvAccount,
lineNumber,
} = job.data;

accountId = user.account_id;
ownerId = user.owner;
importParams = params;
accessToken = user.token;

let business_id;
const businessFields = [businessInfo];
let insertedAccounts = [];
let invalidAccounts = [];

if (!job.data.insertedAccount) {
try {
// Step 1: Create business contact
let contact = await crmContact.addBusinesses(
accountId,
ownerId,
businessFields,
false,
true,
);

// Handle validation errors
if (
!contact?.errors[0]?.contact_id &&
contact?.errors[0]?.additional_info?.details?.length
) {
const err = contact?.errors[0]?.additional_info;
err.position = lineNumber;
invalidAccounts.push(err);
}

// Extract business_id
if (contact?.errors[0]?.contact_id) {
business_id = contact.errors[0].contact_id.toString();
}
if (contact?.savedContacts[0]?._doc?._id) {
business_id = contact.savedContacts[0]._doc._id.toString();
}

// Step 2: Create sub-account if business created
if (business_id) {
const accountsFields = [
{
business: business_id,
user: accountInfo,
},
];

let account = await accountInvite.addAccount(
accountId,
ownerId,
accountsFields,
false,
true,
);

// Handle account errors
if (account?.errors[0]?.message) {
const err = account?.errors[0];
err.position = lineNumber;
invalidAccounts.push(err);
}

// Store successful accounts
if (account?.savedContacts.length) insertedAccounts.push(account.savedContacts[0]);
}
} catch (err) {
err.additional_data = {
message: err.toString(),
position: lineNumber,
additional_info: err.message,
};
invalidAccounts.push(err.additional_data);
}
}

// Step 3: Log results
try {
const {
totalRecordsAdded: tempRecordsAdded,
totalErrors: tempErrors,
logId: tempLogId,
logData: logD,
logs: logsD,
} = await accountLog(
{ errors: invalidAccounts, savedAccounts: insertedAccounts },
totalRecordsAdded,
totalErrors,
logID,
[accountInfo],
user.owner,
user.user_id,
logs,
csvAccount,
'account',
user.account_id,
);

totalRecordsAdded = tempRecordsAdded;
totalErrors = tempErrors;
logID = tempLogId;
resCSVImport = logD;
logs = logsD;
} catch (err) {
logger.error({
initiator: 'QM/accounts/import',
error: err,
message: 'Account Queue Job logging error: ',
});
}

// Cache inserted account for retries
if (!invalidAccounts.length && !job.data.insertedAccount) {
job.data.insertedAccount = insertedAccounts[0];
}

done(null, { account: insertedAccounts[0], accountInfo: [accountInfo] });
} catch (err) {
err.additional_data = {};
done(err);
}
};

const completedCb = async job => {
importParams.user_id = ownerId;
await accountProcess({
resCSVImport,
importParams,
logs,
totalRecordsAdded,
});
};

// Create queue wrapper with callbacks
let Q = QueueWrapper(
`accounts_import_queue_${name}`,
'account',
{ processCb, completedCb },
true,
);

return Promise.resolve(Q);
} catch (err) {
logger.error({ initiator: 'QM/accounts/import', error: err });
return Promise.reject(err);
}
};

4. Field Mapping Utilityโ€‹

File: queue-manager/common/add_data.js

Function: addAccountData()

Purpose: Map CSV columns to account/business structure

Mapping Logic:

module.exports.addAccountData = (
mappings,
value,
maping,
account,
business,
address,
social,
accountInfo,
businessInfo,
) => {
// Account fields (user info)
if (['account.firstname', 'account.lastname', 'account.email'].includes(mappings[maping])) {
if (value) {
account[mappings[maping].replace('account.', '')] = value;
}
accountInfo = { ...accountInfo, ...account };
}

// Business basic fields
if (['business.name', 'business.email', 'business.phone'].includes(mappings[maping])) {
if (value) {
business[mappings[maping].replace('business.', '')] = value;
}
businessInfo = { ...businessInfo, ...business };
}

// Business address fields
if (
[
'business.address.street',
'business.address.unit',
'business.address.city',
'business.address.state_province',
'business.address.postal_code',
'business.address.country',
].includes(mappings[maping])
) {
if (value) {
address[mappings[maping].replace('business.address.', '')] = value;
}
businessInfo['address'] = address;
}

// Business social media fields
if (
[
'business.social.facebook',
'business.social.instagram',
'business.social.linkedin',
'business.social.twitter',
'business.social.youtube',
'business.social.yelp',
'business.social.pinterest',
'business.social.vimeo',
'business.social.snapchat',
'business.social.reddit',
'business.social.tripadvisor',
'business.social.foursquare',
'business.social.rss',
].includes(mappings[maping])
) {
if (value) {
social[mappings[maping].replace('business.social.', '')] = value;
}
businessInfo['social'] = social;
}

return { accountInfo, businessInfo };
};

Supported Mapping Fields:

Account Fields:

  • account.firstname โ†’ User first name
  • account.lastname โ†’ User last name
  • account.email โ†’ User email

Business Fields:

  • business.name โ†’ Business name
  • business.email โ†’ Business email
  • business.phone โ†’ Business phone

Address Fields:

  • business.address.street
  • business.address.unit
  • business.address.city
  • business.address.state_province
  • business.address.postal_code
  • business.address.country

Social Media Fields:

  • business.social.facebook
  • business.social.instagram
  • business.social.linkedin
  • business.social.twitter
  • business.social.youtube
  • business.social.yelp
  • business.social.pinterest
  • business.social.vimeo
  • business.social.snapchat
  • business.social.reddit
  • business.social.tripadvisor
  • business.social.foursquare
  • business.social.rss

5. Account Creation Utilityโ€‹

File: queue-manager/utilities/account-invite.js

Function: addAccount()

Purpose: Create sub-accounts with validation

Validation Schema:

const accountInviteSchema = Joi.object({
business: Joi.string().custom((value, helper) => validateMongoID(value, helper)),
user: Joi.object({
first_name: Joi.string().required(),
last_name: Joi.string().required(),
email: Joi.string().required(),
}),
user_only: Joi.boolean(),
}).and('business', 'user');

Key Validations:

  1. MongoDB ID Validation: Ensures business is a valid ObjectId
  2. Email Uniqueness: Checks for duplicate emails in parent account
  3. Required Fields: first_name, last_name, email mandatory
  4. Business Linkage: Account must be linked to valid business contact

๐Ÿ—„๏ธ Collections Usedโ€‹

queuesโ€‹

  • Operations: Read, Update
  • Model: shared/models/queues.js
  • Usage Context:
    • Query pending account import jobs
    • Track import progress (in_progress flag)
    • Store CSV file keys and mappings

Key Fields:

  • source: 'accounts' (filter for account imports)
  • status: 'pending' | 'completed' | 'failed'
  • in_progress: Boolean lock flag
  • csv: Array with Wasabi S3 key
  • mappings: Field mapping configuration
  • user_id: Import initiator
  • account_id: Target account
  • parent_account: Parent account ID
  • client_id: Client identifier

contactsโ€‹

  • Operations: Create (via crmContact.addBusinesses)
  • Model: shared/models/contact.js
  • Usage Context: Create business contacts before creating sub-accounts

Business Contact Structure:

{
name: 'Business Name',
email: 'business@example.com',
phone: '+1234567890',
address: {
street: '123 Main St',
unit: 'Suite 100',
city: 'New York',
state_province: 'NY',
postal_code: '10001',
country: 'USA'
},
social: {
facebook: 'https://facebook.com/business',
linkedin: 'https://linkedin.com/company/business',
// ... other social media
},
contact_type: 'business',
account_id: ObjectId,
created_by: ObjectId
}

accountsโ€‹

  • Operations: Create (via accountInvite.addAccount)
  • Model: shared/models/account.js
  • Usage Context: Create sub-accounts linked to business contacts

Account Structure:

{
first_name: 'John',
last_name: 'Doe',
email: 'john@example.com',
business: ObjectId, // Link to business contact
parent_account: ObjectId,
account_id: ObjectId,
created_by: ObjectId,
status: 'active'
}

csv_import_logsโ€‹

  • Operations: Create, Update (via accountLog)
  • Model: shared/models/csv-import-log.js
  • Usage Context: Track import results, errors, and line numbers

Log Structure:

{
import_type: 'account',
total_records: 100,
successful_imports: 85,
failed_imports: 15,
logs: [{
line_number: 5,
status: 'error',
message: 'Email already exists',
csv_data: { /* original CSV row */ }
}],
user_id: ObjectId,
account_id: ObjectId,
queue_id: ObjectId
}

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 4, // Maximum 4 retry attempts per account
backoff: 4000, // 4 seconds between retries
}

Cron Scheduleโ€‹

'*/5 * * * * *'; // Every 5 seconds

Frequency Rationale: 5-second intervals ensure rapid processing of import jobs while preventing database overload.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Import Job Queryโ€‹

Conditions:

{
source: "accounts", // Account imports only
status: "pending", // Not yet processed
in_progress: false // Not currently processing
}

Sort Order: { createdAt: -1 } (newest first)

CSV Processing Stepsโ€‹

  1. Fetch CSV from Wasabi

    const file = await new wasabiUtil().getReadStream(key);
    • Downloads CSV file stream from Wasabi S3
    • Uses key from importParams.csv[0].key
  2. Parse CSV to JSON

    const entries = await csv().fromStream(file);
    • Converts CSV to array of objects
    • Each object represents one CSV row
  3. Generate JWT Token

    token = jwt.sign(
    {
    type: 'access_token',
    uid: user_id.toString(),
    account_id: account_id.toString(),
    parent_account: parent_account.toString(),
    client_id: client_id.toString(),
    scope: 'account contacts communications contacts.external contacts.read contacts.create',
    },
    process.env.APP_SECRET,
    { expiresIn: '1h' },
    );
    • Purpose: Authenticate internal API calls from queue processor
    • Expiration: 1 hour (sufficient for import completion)
    • Scopes: account, contacts, communications permissions
  4. Field Mapping

    for (let mapping in mappings) {
    // Navigate nested CSV structure
    let currentMapping = '';
    const codeRS = mapping.split('.');
    for (let param of codeRS) {
    currentMapping = myFun(param, accountData, currentMapping);
    }

    // Map to account/business structure
    const { accountInfo, businessInfo } = addAccountData(
    mappings,
    currentMapping,
    mapping,
    account,
    business,
    address,
    social,
    accountInfo,
    businessInfo,
    );
    }
    • Handles nested CSV columns (e.g., company.address.city)
    • Separates account data (user info) from business data
    • Validates and transforms field values

Queue Processing Flowโ€‹

  1. Create Business Contact

    let contact = await crmContact.addBusinesses(
    accountId,
    ownerId,
    businessFields,
    false, // exitOnError = false
    true, // returnErrors = true
    );
    • Creates business in contacts collection
    • Returns business_id on success
    • Returns validation errors on failure
  2. Create Sub-Account

    const accountsFields = [
    {
    business: business_id,
    user: accountInfo,
    },
    ];

    let account = await accountInvite.addAccount(
    accountId,
    ownerId,
    accountsFields,
    false, // exitOnError = false
    true, // returnErrors = true
    );
    • Only executes if business created successfully
    • Links account to business via business_id
    • Validates email uniqueness in parent account
    • Returns created account or validation errors
  3. Log Results

    const { totalRecordsAdded, totalErrors, logId, logData, logs } = await accountLog(
    { errors: invalidAccounts, savedAccounts: insertedAccounts },
    totalRecordsAdded,
    totalErrors,
    logID,
    [accountInfo],
    user.owner,
    user.user_id,
    logs,
    csvAccount,
    'account',
    user.account_id,
    );
    • Aggregates success/error counts
    • Stores line numbers for failed imports
    • Creates/updates CSV import log document
  4. Update Queue Status

    await accountProcess({
    resCSVImport,
    importParams,
    logs,
    totalRecordsAdded,
    });
    • Marks queue job as complete
    • Updates final statistics
    • Stores CSV import log ID

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Missing CSV Keyโ€‹

if (importParams.csv[0].key) {
key = importParams.csv[0].key;
} else {
throw new Error(`Key is missing in csv details. Queue id: ${importParams._id}`);
}

Result: Job marked as in_progress=false, skipped

CSV Download Failureโ€‹

try {
const file = await new wasabiUtil().getReadStream(key);
} catch (err) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
console.log(`Error occurred while processing data for import.`);
}

Result: Job can be retried on next cron run

Business Creation Failureโ€‹

if (!contact?.errors[0]?.contact_id && contact?.errors[0]?.additional_info?.details?.length) {
const err = contact?.errors[0]?.additional_info;
err.position = lineNumber;
invalidAccounts.push(err);
}

Result: Account creation skipped, error logged with line number

Account Creation Failureโ€‹

if (account?.errors[0]?.message) {
const err = account?.errors[0];
err.position = lineNumber;
invalidAccounts.push(err);
}

Common Reasons:

  • Duplicate email in parent account
  • Invalid MongoDB ID for business
  • Missing required fields (first_name, last_name, email)

Result: Error logged, processing continues with next CSV row

General Processing Errorโ€‹

catch (err) {
err.additional_data = {
message: err.toString(),
position: lineNumber,
additional_info: err.message,
};
invalidAccounts.push(err.additional_data);
}

Result: Error logged with full stack trace and line number

Retry Strategyโ€‹

{
attempts: 4, // Maximum 4 attempts per account
backoff: 4000, // 4 seconds between retries
}

Important: Retries are at the individual account level. Failed accounts are retried up to 4 times, while successful accounts are not reprocessed.

Cached Accounts:

if (!invalidAccounts.length && !job.data.insertedAccount) {
job.data.insertedAccount = insertedAccounts[0];
}

On retry attempts, if job.data.insertedAccount exists, the account creation is skipped (prevents duplicates).

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

// Per-account success
insertedAccounts.push(account.savedContacts[0]);

// Aggregated logging
totalRecordsAdded++;

Error Loggingโ€‹

// Business creation error
{
position: 5, // Line number in CSV
details: ['Name is required', 'Invalid email format'],
additional_info: { /* validation details */ }
}

// Account creation error
{
position: 10,
message: 'Email already exists at position 10.',
account_id: ObjectId // Existing account ID
}

// General error
{
position: 15,
message: 'Error: Internal server error',
additional_info: 'Database connection timeout'
}

CSV Import Logโ€‹

{
import_type: 'account',
total_records: 100,
successful_imports: 85,
failed_imports: 15,
logs: [
{
line_number: 5,
status: 'error',
message: 'Business name is required',
csv_data: { /* original row */ }
},
{
line_number: 10,
status: 'error',
message: 'Email already exists',
csv_data: { /* original row */ }
}
],
user_id: ObjectId,
account_id: ObjectId,
queue_id: ObjectId,
createdAt: ISODate,
updatedAt: ISODate
}

Console Loggingโ€‹

// Success message
console.log('Accounts added to queue for import.');

// Error message
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);

Performance Metricsโ€‹

  • Average Processing Time: 2-5 seconds per CSV row
  • Parallel Import Jobs: All pending imports process simultaneously
  • Success Rate: ~90% (varies by CSV data quality)
  • Typical Volume: 10-500 accounts per import job

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 5 seconds automatically
  • Queue Creation: Via Internal API endpoint (creates queue document with source: "accounts")
  • CSV Upload: User uploads CSV to Wasabi, creates queue with file key

Data Dependenciesโ€‹

  • Wasabi S3: CSV file storage with unique keys
  • JWT Token: 1-hour access token for API authentication
  • Parent Account: Must exist and be active
  • Mappings Configuration: Defines CSV column to field mapping

Jobs That Depend On Thisโ€‹

  • Account Activation: New accounts trigger welcome emails
  • CRM Contact Sync: Business contacts indexed for search
  • Billing Setup: Sub-accounts inherit billing configuration

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Business Contact Creation: Creates contacts in CRM
  • โš ๏ธ Sub-Account Creation: Creates accounts with login credentials
  • โš ๏ธ Email Sending: Welcome emails sent to new accounts (via accountInvite)
  • โš ๏ธ Database Writes: Creates accounts, contacts, CSV logs
  • โš ๏ธ Wasabi Storage: Downloads CSV files (bandwidth usage)

Performance Considerationsโ€‹

  • 5-Second Intervals: Balance between responsiveness and database load
  • Parallel Processing: All pending imports run simultaneously
  • In-Progress Lock: Prevents duplicate processing of same import job
  • JWT Expiration: 1-hour token sufficient for large imports (up to 3600 accounts)
  • Retry Strategy: 4 attempts per account with 4-second backoff

Maintenance Notesโ€‹

  • CSV Log Cleanup: csv_import_logs can grow large - implement retention policy
  • Failed Imports: Monitor failed_imports count for data quality issues
  • Duplicate Detection: Email uniqueness enforced at parent account level
  • Business Orphans: If account creation fails, business contact remains (manual cleanup needed)
  • Queue Stale Jobs: Jobs stuck with in_progress=true indicate processing errors

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/accounts/import

Create Test Import Jobโ€‹

// Upload CSV to Wasabi
const csvContent = `firstname,lastname,email,business_name,business_email,business_phone
John,Doe,john@example.com,Acme Corp,contact@acme.com,+1234567890
Jane,Smith,jane@example.com,Tech Solutions,info@techsolutions.com,+0987654321`;

const key = await wasabi.uploadFile(csvContent, 'imports/test-accounts.csv');

// Create queue document
await Queue.create({
source: 'accounts',
status: 'pending',
in_progress: false,
csv: [{ key: key }],
mappings: {
0: 'account.firstname',
1: 'account.lastname',
2: 'account.email',
3: 'business.name',
4: 'business.email',
5: 'business.phone',
},
user_id: testUserId,
account_id: testAccountId,
parent_account: testParentAccountId,
client_id: testClientId,
});

// Wait 5 seconds for cron to run, then check results
setTimeout(async () => {
const log = await CSVImportLog.findOne({
import_type: 'account',
queue_id: queueId,
});
console.log('Import completed:', log.successful_imports, 'accounts created');
console.log('Errors:', log.failed_imports);
}, 5000);

Monitor Import Progressโ€‹

// Count pending imports
const pending = await Queue.countDocuments({
source: 'accounts',
status: 'pending',
in_progress: false,
});
console.log('Pending account imports:', pending);

// Check in-progress imports
const inProgress = await Queue.countDocuments({
source: 'accounts',
in_progress: true,
});
console.log('Currently processing:', inProgress);

// Recent import logs
const recentLogs = await CSVImportLog.find({
import_type: 'account',
createdAt: { $gte: new Date(Date.now() - 60 * 60 * 1000) }, // Last hour
})
.sort({ createdAt: -1 })
.limit(10);

console.log('Recent imports:', recentLogs);

// Failed imports
const failed = recentLogs.filter(log => log.failed_imports > 0);
console.log('Imports with errors:', failed.length);

Validate Field Mappingsโ€‹

// Test mapping utility
const { addAccountData } = require('./common/add_data');

const mappings = {
0: 'account.firstname',
1: 'business.name',
2: 'business.address.city',
};

const csvRow = {
0: 'John',
1: 'Acme Corp',
2: 'New York',
};

let accountInfo = {};
let businessInfo = {};
const account = {};
const business = {};
const address = {};
const social = {};

for (let mapping in mappings) {
const result = addAccountData(
mappings,
csvRow[mapping],
mapping,
account,
business,
address,
social,
accountInfo,
businessInfo,
);
accountInfo = result.accountInfo;
businessInfo = result.businessInfo;
}

console.log('Account:', accountInfo);
// { firstname: 'John' }

console.log('Business:', businessInfo);
// { name: 'Acme Corp', address: { city: 'New York' } }

Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 2-5 seconds per account
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM