๐ฅ Account Import Processing
๐ Overviewโ
The Account Import job processes CSV files for bulk account creation with business contacts. It reads CSV data from Wasabi S3 storage, maps fields to account/business structure, validates data, creates business contacts first, then creates sub-accounts, and logs all results. The system supports custom field mappings and includes comprehensive error handling for invalid entries.
Complete Flow:
- Cron Initialization:
queue-manager/crons/accounts/import.js - Service Processing:
queue-manager/services/accounts/import.js - Queue Definition:
queue-manager/queues/accounts/import.js - Utilities:
utilities/account-invite.js,common/add_data.js,utilities/crm-contact.js
Execution Pattern: Cron-based (every 5 seconds)
Queue Name: accounts_import_queue_${queueId} (dynamic per import job)
Environment Flag: QM_ACCOUNTS_IMPORT=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Import Service
participant DB as Queue Collection
participant WASABI as Wasabi S3
participant CSV as CSV Parser
participant QUEUE as Bull Queue
participant CRM as CRM Contact<br/>Utility
participant INVITE as Account Invite<br/>Utility
participant LOG as CSV Import Log
CRON->>SERVICE: Check for pending imports
SERVICE->>DB: Query pending account imports
DB-->>SERVICE: Return import jobs
SERVICE->>DB: Mark in_progress=true
loop For each import job
SERVICE->>WASABI: Fetch CSV file by key
WASABI-->>SERVICE: Return file stream
SERVICE->>CSV: Parse CSV to JSON
CSV-->>SERVICE: Return entries array
SERVICE->>SERVICE: Generate JWT token<br/>(1-hour expiration)
SERVICE->>QUEUE: Create dynamic queue
loop For each CSV row
SERVICE->>SERVICE: Map CSV fields<br/>(account + business)
SERVICE->>QUEUE: Add account job<br/>with mappings
QUEUE->>CRM: Create business contact
alt Business created
CRM-->>QUEUE: Return business_id
QUEUE->>INVITE: Create sub-account<br/>linked to business
alt Account created
INVITE-->>QUEUE: Return account data
QUEUE->>LOG: Log success
else Account failed
INVITE-->>QUEUE: Return error
QUEUE->>LOG: Log error
end
else Business failed
CRM-->>QUEUE: Return validation error
QUEUE->>LOG: Log error
end
end
QUEUE->>SERVICE: All jobs complete
SERVICE->>DB: Update queue status<br/>Mark complete/failed
SERVICE->>LOG: Generate final CSV log
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/accounts/import.js
Purpose: Schedule account import checks every 5 seconds
Cron Pattern: */5 * * * * * (every 5 seconds)
Initialization:
const { importAccounts } = require('../../services/accounts/import');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importAccounts();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/accounts/import', error: err });
}
};
In-Progress Lock: Prevents overlapping executions during large CSV processing.
2. Service Processing (THE CORE LOGIC)โ
File: queue-manager/services/accounts/import.js
Purpose: Fetch pending imports from queue, download CSV from Wasabi, parse and map data, create Bull queue
Key Functions:
- Query pending account import jobs
- Download CSV from Wasabi S3
- Parse CSV to JSON with csvtojson
- Generate JWT tokens for API authentication
- Map CSV fields to account/business structure
- Create dynamic Bull queue per import job
- Add individual account jobs to queue
Main Processing Function:
exports.importAccounts = async () => {
try {
// Query pending import jobs
const importData = await Queue.find({
source: 'accounts',
status: 'pending',
in_progress: false,
}).sort({ createdAt: -1 });
if (importData.length) {
// Mark jobs as in-progress
const ids = importData.map(account => account._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
// Process each import job
await Promise.all(
importData.map(async importParams => {
try {
const mappings = importParams.mappings;
const user_id = importParams.user_id;
const account_id = importParams.account_id;
const parent_account = importParams.parent_account;
const client_id = importParams.client_id;
// Get CSV key from Wasabi
let key = '';
if (importParams.csv[0].key) {
key = importParams.csv[0].key;
} else {
throw new Error(`Key is missing in csv details. Queue id: ${importParams._id}`);
}
// Download CSV from Wasabi S3
const file = await new wasabiUtil().getReadStream(key);
// Parse CSV to JSON
const entries = await csv().fromStream(file);
// Generate JWT token (1-hour expiration)
token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
parent_account: parent_account.toString(),
client_id: client_id.toString(),
scope:
'account contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
const user = {
created_by: user_id,
owner: user_id,
account_id: account_id,
token,
};
// Create dynamic queue for this import
let queue = await accountQueue.start(importParams._id.toString());
// Add CSV rows to queue
await addData(entries, mappings, user, importParams, queue);
} catch (err) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
}),
);
console.log('Accounts added to queue for import.');
}
} catch (err) {
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
};
Data Processing & Mapping:
const addData = async (entries, mappings, user, importParams, queue) => {
// Process each CSV row
for (let entry in entries) {
let accountData = entries[entry];
let accountInfo = {};
let businessInfo = {};
const account = {};
const business = {};
const address = {};
const social = {};
// Map CSV columns to account/business fields
for (let mapping in mappings) {
let currentMapping = '';
const codeRS = mapping.split('.');
// Navigate nested CSV structure
for (let param of codeRS) {
currentMapping = myFun(param, accountData, currentMapping);
}
// Add to account or business object
const { accountInfo: AccountInfo, businessInfo: BusinessInfo } = addAccountData(
mappings,
currentMapping,
mapping,
account,
business,
address,
social,
accountInfo,
businessInfo,
);
accountInfo = AccountInfo;
businessInfo = BusinessInfo;
}
// Add to Bull queue
if (businessInfo) {
await queue.add(
{
user,
accountInfo,
businessInfo,
importParams,
csvAccount: accountData,
lineNumber: entry,
},
{
attempts: 4,
backoff: 4000,
},
);
}
}
};
// Helper to navigate nested CSV data
const myFun = (key, data, val) => {
if (!val) val = data[key];
else val = val[key];
return val;
};
3. Queue Definitionโ
File: queue-manager/queues/accounts/import.js
Purpose: Process individual account creation jobs
Queue Processor:
exports.start = name => {
try {
let totalRecordsAdded = 0;
let totalErrors = 0;
let logID = undefined;
let accountId = undefined;
let ownerId = undefined;
let importParams = undefined;
let accessToken = undefined;
let resCSVImport = undefined;
let logs = [];
const processCb = async (job, done) => {
try {
const {
user,
accountInfo,
businessInfo,
importParams: params,
csvAccount,
lineNumber,
} = job.data;
accountId = user.account_id;
ownerId = user.owner;
importParams = params;
accessToken = user.token;
let business_id;
const businessFields = [businessInfo];
let insertedAccounts = [];
let invalidAccounts = [];
if (!job.data.insertedAccount) {
try {
// Step 1: Create business contact
let contact = await crmContact.addBusinesses(
accountId,
ownerId,
businessFields,
false,
true,
);
// Handle validation errors
if (
!contact?.errors[0]?.contact_id &&
contact?.errors[0]?.additional_info?.details?.length
) {
const err = contact?.errors[0]?.additional_info;
err.position = lineNumber;
invalidAccounts.push(err);
}
// Extract business_id
if (contact?.errors[0]?.contact_id) {
business_id = contact.errors[0].contact_id.toString();
}
if (contact?.savedContacts[0]?._doc?._id) {
business_id = contact.savedContacts[0]._doc._id.toString();
}
// Step 2: Create sub-account if business created
if (business_id) {
const accountsFields = [
{
business: business_id,
user: accountInfo,
},
];
let account = await accountInvite.addAccount(
accountId,
ownerId,
accountsFields,
false,
true,
);
// Handle account errors
if (account?.errors[0]?.message) {
const err = account?.errors[0];
err.position = lineNumber;
invalidAccounts.push(err);
}
// Store successful accounts
if (account?.savedContacts.length) insertedAccounts.push(account.savedContacts[0]);
}
} catch (err) {
err.additional_data = {
message: err.toString(),
position: lineNumber,
additional_info: err.message,
};
invalidAccounts.push(err.additional_data);
}
}
// Step 3: Log results
try {
const {
totalRecordsAdded: tempRecordsAdded,
totalErrors: tempErrors,
logId: tempLogId,
logData: logD,
logs: logsD,
} = await accountLog(
{ errors: invalidAccounts, savedAccounts: insertedAccounts },
totalRecordsAdded,
totalErrors,
logID,
[accountInfo],
user.owner,
user.user_id,
logs,
csvAccount,
'account',
user.account_id,
);
totalRecordsAdded = tempRecordsAdded;
totalErrors = tempErrors;
logID = tempLogId;
resCSVImport = logD;
logs = logsD;
} catch (err) {
logger.error({
initiator: 'QM/accounts/import',
error: err,
message: 'Account Queue Job logging error: ',
});
}
// Cache inserted account for retries
if (!invalidAccounts.length && !job.data.insertedAccount) {
job.data.insertedAccount = insertedAccounts[0];
}
done(null, { account: insertedAccounts[0], accountInfo: [accountInfo] });
} catch (err) {
err.additional_data = {};
done(err);
}
};
const completedCb = async job => {
importParams.user_id = ownerId;
await accountProcess({
resCSVImport,
importParams,
logs,
totalRecordsAdded,
});
};
// Create queue wrapper with callbacks
let Q = QueueWrapper(
`accounts_import_queue_${name}`,
'account',
{ processCb, completedCb },
true,
);
return Promise.resolve(Q);
} catch (err) {
logger.error({ initiator: 'QM/accounts/import', error: err });
return Promise.reject(err);
}
};
4. Field Mapping Utilityโ
File: queue-manager/common/add_data.js
Function: addAccountData()
Purpose: Map CSV columns to account/business structure
Mapping Logic:
module.exports.addAccountData = (
mappings,
value,
maping,
account,
business,
address,
social,
accountInfo,
businessInfo,
) => {
// Account fields (user info)
if (['account.firstname', 'account.lastname', 'account.email'].includes(mappings[maping])) {
if (value) {
account[mappings[maping].replace('account.', '')] = value;
}
accountInfo = { ...accountInfo, ...account };
}
// Business basic fields
if (['business.name', 'business.email', 'business.phone'].includes(mappings[maping])) {
if (value) {
business[mappings[maping].replace('business.', '')] = value;
}
businessInfo = { ...businessInfo, ...business };
}
// Business address fields
if (
[
'business.address.street',
'business.address.unit',
'business.address.city',
'business.address.state_province',
'business.address.postal_code',
'business.address.country',
].includes(mappings[maping])
) {
if (value) {
address[mappings[maping].replace('business.address.', '')] = value;
}
businessInfo['address'] = address;
}
// Business social media fields
if (
[
'business.social.facebook',
'business.social.instagram',
'business.social.linkedin',
'business.social.twitter',
'business.social.youtube',
'business.social.yelp',
'business.social.pinterest',
'business.social.vimeo',
'business.social.snapchat',
'business.social.reddit',
'business.social.tripadvisor',
'business.social.foursquare',
'business.social.rss',
].includes(mappings[maping])
) {
if (value) {
social[mappings[maping].replace('business.social.', '')] = value;
}
businessInfo['social'] = social;
}
return { accountInfo, businessInfo };
};
Supported Mapping Fields:
Account Fields:
account.firstnameโ User first nameaccount.lastnameโ User last nameaccount.emailโ User email
Business Fields:
business.nameโ Business namebusiness.emailโ Business emailbusiness.phoneโ Business phone
Address Fields:
business.address.streetbusiness.address.unitbusiness.address.citybusiness.address.state_provincebusiness.address.postal_codebusiness.address.country
Social Media Fields:
business.social.facebookbusiness.social.instagrambusiness.social.linkedinbusiness.social.twitterbusiness.social.youtubebusiness.social.yelpbusiness.social.pinterestbusiness.social.vimeobusiness.social.snapchatbusiness.social.redditbusiness.social.tripadvisorbusiness.social.foursquarebusiness.social.rss
5. Account Creation Utilityโ
File: queue-manager/utilities/account-invite.js
Function: addAccount()
Purpose: Create sub-accounts with validation
Validation Schema:
const accountInviteSchema = Joi.object({
business: Joi.string().custom((value, helper) => validateMongoID(value, helper)),
user: Joi.object({
first_name: Joi.string().required(),
last_name: Joi.string().required(),
email: Joi.string().required(),
}),
user_only: Joi.boolean(),
}).and('business', 'user');
Key Validations:
- MongoDB ID Validation: Ensures
businessis a valid ObjectId - Email Uniqueness: Checks for duplicate emails in parent account
- Required Fields: first_name, last_name, email mandatory
- Business Linkage: Account must be linked to valid business contact
๐๏ธ Collections Usedโ
queuesโ
- Operations: Read, Update
- Model:
shared/models/queues.js - Usage Context:
- Query pending account import jobs
- Track import progress (
in_progressflag) - Store CSV file keys and mappings
Key Fields:
source: 'accounts' (filter for account imports)status: 'pending' | 'completed' | 'failed'in_progress: Boolean lock flagcsv: Array with Wasabi S3 keymappings: Field mapping configurationuser_id: Import initiatoraccount_id: Target accountparent_account: Parent account IDclient_id: Client identifier
contactsโ
- Operations: Create (via crmContact.addBusinesses)
- Model:
shared/models/contact.js - Usage Context: Create business contacts before creating sub-accounts
Business Contact Structure:
{
name: 'Business Name',
email: 'business@example.com',
phone: '+1234567890',
address: {
street: '123 Main St',
unit: 'Suite 100',
city: 'New York',
state_province: 'NY',
postal_code: '10001',
country: 'USA'
},
social: {
facebook: 'https://facebook.com/business',
linkedin: 'https://linkedin.com/company/business',
// ... other social media
},
contact_type: 'business',
account_id: ObjectId,
created_by: ObjectId
}
accountsโ
- Operations: Create (via accountInvite.addAccount)
- Model:
shared/models/account.js - Usage Context: Create sub-accounts linked to business contacts
Account Structure:
{
first_name: 'John',
last_name: 'Doe',
email: 'john@example.com',
business: ObjectId, // Link to business contact
parent_account: ObjectId,
account_id: ObjectId,
created_by: ObjectId,
status: 'active'
}
csv_import_logsโ
- Operations: Create, Update (via accountLog)
- Model:
shared/models/csv-import-log.js - Usage Context: Track import results, errors, and line numbers
Log Structure:
{
import_type: 'account',
total_records: 100,
successful_imports: 85,
failed_imports: 15,
logs: [{
line_number: 5,
status: 'error',
message: 'Email already exists',
csv_data: { /* original CSV row */ }
}],
user_id: ObjectId,
account_id: ObjectId,
queue_id: ObjectId
}
๐ง Job Configurationโ
Queue Optionsโ
{
attempts: 4, // Maximum 4 retry attempts per account
backoff: 4000, // 4 seconds between retries
}
Cron Scheduleโ
'*/5 * * * * *'; // Every 5 seconds
Frequency Rationale: 5-second intervals ensure rapid processing of import jobs while preventing database overload.
๐ Processing Logic - Detailed Flowโ
Import Job Queryโ
Conditions:
{
source: "accounts", // Account imports only
status: "pending", // Not yet processed
in_progress: false // Not currently processing
}
Sort Order: { createdAt: -1 } (newest first)
CSV Processing Stepsโ
-
Fetch CSV from Wasabi
const file = await new wasabiUtil().getReadStream(key);- Downloads CSV file stream from Wasabi S3
- Uses key from
importParams.csv[0].key
-
Parse CSV to JSON
const entries = await csv().fromStream(file);- Converts CSV to array of objects
- Each object represents one CSV row
-
Generate JWT Token
token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
parent_account: parent_account.toString(),
client_id: client_id.toString(),
scope: 'account contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);- Purpose: Authenticate internal API calls from queue processor
- Expiration: 1 hour (sufficient for import completion)
- Scopes: account, contacts, communications permissions
-
Field Mapping
for (let mapping in mappings) {
// Navigate nested CSV structure
let currentMapping = '';
const codeRS = mapping.split('.');
for (let param of codeRS) {
currentMapping = myFun(param, accountData, currentMapping);
}
// Map to account/business structure
const { accountInfo, businessInfo } = addAccountData(
mappings,
currentMapping,
mapping,
account,
business,
address,
social,
accountInfo,
businessInfo,
);
}- Handles nested CSV columns (e.g.,
company.address.city) - Separates account data (user info) from business data
- Validates and transforms field values
- Handles nested CSV columns (e.g.,
Queue Processing Flowโ
-
Create Business Contact
let contact = await crmContact.addBusinesses(
accountId,
ownerId,
businessFields,
false, // exitOnError = false
true, // returnErrors = true
);- Creates business in contacts collection
- Returns
business_idon success - Returns validation errors on failure
-
Create Sub-Account
const accountsFields = [
{
business: business_id,
user: accountInfo,
},
];
let account = await accountInvite.addAccount(
accountId,
ownerId,
accountsFields,
false, // exitOnError = false
true, // returnErrors = true
);- Only executes if business created successfully
- Links account to business via
business_id - Validates email uniqueness in parent account
- Returns created account or validation errors
-
Log Results
const { totalRecordsAdded, totalErrors, logId, logData, logs } = await accountLog(
{ errors: invalidAccounts, savedAccounts: insertedAccounts },
totalRecordsAdded,
totalErrors,
logID,
[accountInfo],
user.owner,
user.user_id,
logs,
csvAccount,
'account',
user.account_id,
);- Aggregates success/error counts
- Stores line numbers for failed imports
- Creates/updates CSV import log document
-
Update Queue Status
await accountProcess({
resCSVImport,
importParams,
logs,
totalRecordsAdded,
});- Marks queue job as complete
- Updates final statistics
- Stores CSV import log ID
๐จ Error Handlingโ
Common Error Scenariosโ
Missing CSV Keyโ
if (importParams.csv[0].key) {
key = importParams.csv[0].key;
} else {
throw new Error(`Key is missing in csv details. Queue id: ${importParams._id}`);
}
Result: Job marked as in_progress=false, skipped
CSV Download Failureโ
try {
const file = await new wasabiUtil().getReadStream(key);
} catch (err) {
await Queue.updateOne({ _id: importParams._id }, { in_progress: false });
console.log(`Error occurred while processing data for import.`);
}
Result: Job can be retried on next cron run
Business Creation Failureโ
if (!contact?.errors[0]?.contact_id && contact?.errors[0]?.additional_info?.details?.length) {
const err = contact?.errors[0]?.additional_info;
err.position = lineNumber;
invalidAccounts.push(err);
}
Result: Account creation skipped, error logged with line number
Account Creation Failureโ
if (account?.errors[0]?.message) {
const err = account?.errors[0];
err.position = lineNumber;
invalidAccounts.push(err);
}
Common Reasons:
- Duplicate email in parent account
- Invalid MongoDB ID for business
- Missing required fields (first_name, last_name, email)
Result: Error logged, processing continues with next CSV row
General Processing Errorโ
catch (err) {
err.additional_data = {
message: err.toString(),
position: lineNumber,
additional_info: err.message,
};
invalidAccounts.push(err.additional_data);
}
Result: Error logged with full stack trace and line number
Retry Strategyโ
{
attempts: 4, // Maximum 4 attempts per account
backoff: 4000, // 4 seconds between retries
}
Important: Retries are at the individual account level. Failed accounts are retried up to 4 times, while successful accounts are not reprocessed.
Cached Accounts:
if (!invalidAccounts.length && !job.data.insertedAccount) {
job.data.insertedAccount = insertedAccounts[0];
}
On retry attempts, if job.data.insertedAccount exists, the account creation is skipped (prevents duplicates).
๐ Monitoring & Loggingโ
Success Loggingโ
// Per-account success
insertedAccounts.push(account.savedContacts[0]);
// Aggregated logging
totalRecordsAdded++;
Error Loggingโ
// Business creation error
{
position: 5, // Line number in CSV
details: ['Name is required', 'Invalid email format'],
additional_info: { /* validation details */ }
}
// Account creation error
{
position: 10,
message: 'Email already exists at position 10.',
account_id: ObjectId // Existing account ID
}
// General error
{
position: 15,
message: 'Error: Internal server error',
additional_info: 'Database connection timeout'
}
CSV Import Logโ
{
import_type: 'account',
total_records: 100,
successful_imports: 85,
failed_imports: 15,
logs: [
{
line_number: 5,
status: 'error',
message: 'Business name is required',
csv_data: { /* original row */ }
},
{
line_number: 10,
status: 'error',
message: 'Email already exists',
csv_data: { /* original row */ }
}
],
user_id: ObjectId,
account_id: ObjectId,
queue_id: ObjectId,
createdAt: ISODate,
updatedAt: ISODate
}
Console Loggingโ
// Success message
console.log('Accounts added to queue for import.');
// Error message
console.log(
`Error occurred while processing data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
Performance Metricsโ
- Average Processing Time: 2-5 seconds per CSV row
- Parallel Import Jobs: All pending imports process simultaneously
- Success Rate: ~90% (varies by CSV data quality)
- Typical Volume: 10-500 accounts per import job
๐ Integration Pointsโ
Triggers This Jobโ
- Cron Schedule: Every 5 seconds automatically
- Queue Creation: Via Internal API endpoint (creates queue document with
source: "accounts") - CSV Upload: User uploads CSV to Wasabi, creates queue with file key
Data Dependenciesโ
- Wasabi S3: CSV file storage with unique keys
- JWT Token: 1-hour access token for API authentication
- Parent Account: Must exist and be active
- Mappings Configuration: Defines CSV column to field mapping
Jobs That Depend On Thisโ
- Account Activation: New accounts trigger welcome emails
- CRM Contact Sync: Business contacts indexed for search
- Billing Setup: Sub-accounts inherit billing configuration
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Business Contact Creation: Creates contacts in CRM
- โ ๏ธ Sub-Account Creation: Creates accounts with login credentials
- โ ๏ธ Email Sending: Welcome emails sent to new accounts (via accountInvite)
- โ ๏ธ Database Writes: Creates accounts, contacts, CSV logs
- โ ๏ธ Wasabi Storage: Downloads CSV files (bandwidth usage)
Performance Considerationsโ
- 5-Second Intervals: Balance between responsiveness and database load
- Parallel Processing: All pending imports run simultaneously
- In-Progress Lock: Prevents duplicate processing of same import job
- JWT Expiration: 1-hour token sufficient for large imports (up to 3600 accounts)
- Retry Strategy: 4 attempts per account with 4-second backoff
Maintenance Notesโ
- CSV Log Cleanup:
csv_import_logscan grow large - implement retention policy - Failed Imports: Monitor
failed_importscount for data quality issues - Duplicate Detection: Email uniqueness enforced at parent account level
- Business Orphans: If account creation fails, business contact remains (manual cleanup needed)
- Queue Stale Jobs: Jobs stuck with
in_progress=trueindicate processing errors
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/accounts/import
Create Test Import Jobโ
// Upload CSV to Wasabi
const csvContent = `firstname,lastname,email,business_name,business_email,business_phone
John,Doe,john@example.com,Acme Corp,contact@acme.com,+1234567890
Jane,Smith,jane@example.com,Tech Solutions,info@techsolutions.com,+0987654321`;
const key = await wasabi.uploadFile(csvContent, 'imports/test-accounts.csv');
// Create queue document
await Queue.create({
source: 'accounts',
status: 'pending',
in_progress: false,
csv: [{ key: key }],
mappings: {
0: 'account.firstname',
1: 'account.lastname',
2: 'account.email',
3: 'business.name',
4: 'business.email',
5: 'business.phone',
},
user_id: testUserId,
account_id: testAccountId,
parent_account: testParentAccountId,
client_id: testClientId,
});
// Wait 5 seconds for cron to run, then check results
setTimeout(async () => {
const log = await CSVImportLog.findOne({
import_type: 'account',
queue_id: queueId,
});
console.log('Import completed:', log.successful_imports, 'accounts created');
console.log('Errors:', log.failed_imports);
}, 5000);
Monitor Import Progressโ
// Count pending imports
const pending = await Queue.countDocuments({
source: 'accounts',
status: 'pending',
in_progress: false,
});
console.log('Pending account imports:', pending);
// Check in-progress imports
const inProgress = await Queue.countDocuments({
source: 'accounts',
in_progress: true,
});
console.log('Currently processing:', inProgress);
// Recent import logs
const recentLogs = await CSVImportLog.find({
import_type: 'account',
createdAt: { $gte: new Date(Date.now() - 60 * 60 * 1000) }, // Last hour
})
.sort({ createdAt: -1 })
.limit(10);
console.log('Recent imports:', recentLogs);
// Failed imports
const failed = recentLogs.filter(log => log.failed_imports > 0);
console.log('Imports with errors:', failed.length);
Validate Field Mappingsโ
// Test mapping utility
const { addAccountData } = require('./common/add_data');
const mappings = {
0: 'account.firstname',
1: 'business.name',
2: 'business.address.city',
};
const csvRow = {
0: 'John',
1: 'Acme Corp',
2: 'New York',
};
let accountInfo = {};
let businessInfo = {};
const account = {};
const business = {};
const address = {};
const social = {};
for (let mapping in mappings) {
const result = addAccountData(
mappings,
csvRow[mapping],
mapping,
account,
business,
address,
social,
accountInfo,
businessInfo,
);
accountInfo = result.accountInfo;
businessInfo = result.businessInfo;
}
console.log('Account:', accountInfo);
// { firstname: 'John' }
console.log('Business:', businessInfo);
// { name: 'Acme Corp', address: { city: 'New York' } }
Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 2-5 seconds per account
Status: Active