๐ฅ Contact Import Processing
๐ Overviewโ
The Contact Import module handles CSV file imports for contacts, supporting three distinct import types: People (person), Companies (business), and Both (linked person-company records). This is one of the most complex Queue Manager modules with 450+ lines of business logic.
Cron Schedule: */5 * * * * * (Every 5 seconds)
Source Files:
- Cron:
queue-manager/crons/contacts/import.js - Service:
queue-manager/services/contacts/import.js(450+ lines) - Queues:
queue-manager/queues/contacts/person.js(Person imports)queue-manager/queues/contacts/business.js(Company imports)queue-manager/queues/contacts/contacts.js(Both imports)
๐ฏ Business Purposeโ
Enables bulk contact imports from CSV files uploaded by users, supporting:
- CRM Data Migration: Import existing contacts from other systems
- Lead Lists: Bulk upload of purchased or collected lead lists
- Company Databases: Import business directories
- Linked Records: Create person-company relationships in single import
๐ Complete Processing Flowโ
sequenceDiagram
participant USER as User/Internal API
participant QUEUE_COLL as Queue Collection
participant CRON as Import Cron (5s)
participant SERVICE as Import Service
participant WASABI as Wasabi S3
participant CSV as CSV Parser
participant BULL as Bull Queues
participant DB as MongoDB
USER->>QUEUE_COLL: Create import job<br/>(status: pending, CSV key)
loop Every 5 seconds
CRON->>SERVICE: Execute importContacts()
SERVICE->>QUEUE_COLL: Query pending imports<br/>(in_progress: false)
QUEUE_COLL-->>SERVICE: Import jobs
SERVICE->>QUEUE_COLL: Mark in_progress: true
SERVICE->>SERVICE: Generate JWT token<br/>(1hr expiration)
SERVICE->>WASABI: Fetch CSV file<br/>(getReadStream)
WASABI-->>SERVICE: CSV stream
SERVICE->>CSV: Parse CSV to JSON<br/>(csvtojson library)
CSV-->>SERVICE: JSON array
alt Import Type: Person
SERVICE->>BULL: Create Person Queue
SERVICE->>SERVICE: insertCsvPeopleData()
SERVICE->>BULL: Add jobs (1 per row)
BULL->>DB: Insert person contacts
else Import Type: Business
SERVICE->>BULL: Create Business Queue
SERVICE->>SERVICE: insertCsvBusinessData()
SERVICE->>BULL: Add jobs (1 per row)
BULL->>DB: Insert business contacts
else Import Type: Both
SERVICE->>BULL: Create Contacts Queue
SERVICE->>SERVICE: insertCsvBothData()
SERVICE->>BULL: Add jobs (1 per row)
BULL->>DB: Insert person + business<br/>(with linking)
end
end
๐ง Main Service Functionโ
importContacts()โ
Purpose: Orchestrates the entire import process from queue polling to CSV processing.
Step 1: Query Pending Importsโ
let importData = await Queue.find({
status: 'pending',
source: 'contacts',
in_progress: false,
}).sort({ createdAt: -1 });
// Mark as in progress to prevent duplicate processing
const ids = importData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
Logic:
- Queries
Queuecollection for pending contact imports - Sorts by
createdAtdescending (newest first) - Immediately marks jobs as
in_progressto prevent race conditions - Multiple instances can run concurrently without conflicts
Step 2: Generate Authentication Tokenโ
importParams.authToken = jwt.sign(
{
type: 'access_token',
uid: importParams.user_id.toString(),
account_id: importParams.account_id.toString(),
parent_account: importParams.parent_account.toString(),
client_id: importParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
Logic:
- Creates short-lived JWT token (1 hour) for queue processors
- Scopes:
contacts,communications, file operations - Used by Bull queue processors to authenticate API calls
- Prevents need to store long-lived credentials
Step 3: Fetch CSV from Wasabi S3โ
let keyName = encodeURI(importParams.csv[0].key);
const fileUrl = await new wasabiUtil().getReadStream(keyName);
const jsonEntries = await csv().fromStream(fileUrl);
Logic:
- Retrieves CSV file key from import parameters
- Uses Wasabi utility to get S3 read stream
- Streams CSV directly to parser (memory efficient)
- Converts CSV to JSON array using
csvtojsonlibrary
Step 4: Route to Appropriate Processorโ
if (type === 'contacts') {
let Q = await personQueue.start(importParams._id.toString());
await insertCsvPeopleData({ ...payload, BullQueue: Q });
}
if (type === 'companies') {
let Q = await businessQueue.start(importParams._id.toString());
await insertCsvBusinessData({ ...payload, BullQueue: Q });
}
if (type === 'both') {
let Q = await contactsQueue.start(importParams._id.toString());
await insertCsvBothData({ ...payload, BullQueue: Q });
}
Logic:
- Three import types supported
- Each type gets dedicated Bull queue
- Queue name includes import ID for tracking
- Payload includes all necessary data for processing
๐ Import Type 1: Person/People (Contacts)โ
insertCsvPeopleData()โ
Purpose: Processes CSV rows as individual person contacts.
Field Mapping Processโ
for (let maping in mappings) {
const codeRS = maping.trim().split('.');
for (let param of codeRS) {
currentMapping = myFun(param, contact, currentMapping);
}
const { cc, address, social, additionalInfo } = addData(
mappings,
currentMapping,
maping,
cc,
address,
social,
additionalInfo,
);
}
Mapping Logic:
- Dot Notation Support:
company.address.streetโ nested field access - Field Accumulation: Build contact object progressively
- Category Separation: Separate fields into main, address, social
- Additional Info: Captures unmapped custom fields
Name Field Handlingโ
let nameFields = {
name:
(cc.first_name || cc.last_name
? `${cc.first_name || ''} ${cc.last_name || ''}`?.trim()
: cc.name?.trim()) || '',
first_name: !cc.first_name ? `${cc.name?.split(' ')?.[0] || ''}` : cc.first_name,
last_name: !cc.last_name ? `${cc.name?.slice(cc.name?.indexOf(' ') + 1) || ''}` : cc.last_name,
};
Name Logic:
- Full Name Priority: Concatenates first + last if available
- Name Splitting: If only full name provided, splits on first space
- Fallback Handling: Ensures all name fields populated
- Trim Whitespace: Removes leading/trailing spaces
Lead Status Normalizationโ
if (contactObject.lead_status) {
contactObject.lead_status = contactObject.lead_status.replace(/ /g, '_');
contactObject.lead_status = contactObject.lead_status.toLowerCase();
}
Logic:
- Converts "New Lead" โ "new_lead"
- Standardizes for database consistency
- Prevents duplicate statuses with different casing
Contact Object Creationโ
let contactObject = {
_id: new mongoose.Types.ObjectId(),
...cc,
...nameFields,
import_id: importParams._id,
type: 'person',
parent_account: account_id,
created_by: owner,
owner,
provider: 'csv',
tags: importParams.additional_data?.tags,
};
// Remove empty address objects
if (!Object.keys(contactObject.address || {}).length) delete contactObject.address;
Object Fields:
- Generated ID: Pre-generate ObjectId for tracking
- Import Tracking:
import_idlinks to source import job - Type: Always
'person'for this processor - Ownership: Links to account and user
- Provider: Always
'csv'for imports - Tags: Optional tags for categorization
- Address Cleanup: Removes empty address objects to save space
Queue Job Creationโ
await BullQueue.add(
{
account_id,
owner,
crmContacts: [contactObject],
importParams,
accessToken,
csvContact: contact, // Original CSV row
lineNumber: line, // Row number for error reporting
account,
},
{
attempts: 3,
backoff: 4000, // 4 second exponential backoff
},
);
Job Options:
- Retry Logic: 3 attempts with backoff
- Context Preservation: Original CSV row for debugging
- Line Tracking: Row number for error messages
- Authentication: JWT token for API calls
๐ข Import Type 2: Business/Companiesโ
insertCsvBusinessData()โ
Purpose: Processes CSV rows as company/business contacts.
Business Object Creationโ
let companyObject = {
_id: new mongoose.Types.ObjectId(),
...cc,
address,
social,
import_id: importParams._id,
type: 'business',
parent_account: account_id,
created_by: owner,
owner,
provider: 'csv',
tags: importParams.additional_data?.tags,
};
Key Differences from Person:
- Type:
'business'instead of'person' - No Name Splitting: Company names stay intact
- No Lead Status: Businesses don't have lead statuses
- Industry Fields: May include industry, company size, etc.
๐ Import Type 3: Both (Person + Business Linked)โ
insertCsvBothData()โ
Purpose: Creates both person and business records with bi-directional linking.
Mapping Split Logicโ
for (let maping in mappings) {
let addType;
// Determine if field is for person or business
if (mappings[maping].split('.')[0] === 'person') {
addType = 'person';
} else {
addType = 'business';
}
// Remove prefix from mapping
mappings[maping] = mappings[maping].split('.').slice(1).join('.');
// Add to appropriate object
if (addType === 'person') {
// Add to cc (contact)
} else if (addType === 'business') {
// Add to cb (company/business)
}
}
Mapping Prefixes:
person.first_nameโ Person's first namebusiness.company_nameโ Company name- Allows single CSV to populate both record types
Linked Object Creationโ
const perId = new mongoose.Types.ObjectId();
const bizId = new mongoose.Types.ObjectId();
let contactObject = {
_id: perId,
...cc,
...nameFields,
...commonPayload,
type: 'person',
businesses: [bizId], // Link to business
};
let businessObject = {
_id: bizId,
...cb,
...commonPayload,
type: 'business',
people: [perId], // Link to person
};
Bi-Directional Linking:
- Person record contains
businessesarray with business ID - Business record contains
peoplearray with person ID - Allows navigation in both directions
- Maintains relationship integrity
Both Job Creationโ
await BullQueue.add(
{
account_id,
owner,
crmPerson: [contactObject], // Person record
crmBusiness: [businessObject], // Business record
importParams,
accessToken,
csvContact: contact,
lineNumber: line,
account,
},
{
attempts: 3,
backoff: 4000,
},
);
Dual Insert:
- Single job creates both records atomically
- Queue processor ensures both succeed or both fail
- Maintains data consistency
๐ ๏ธ Helper Functionsโ
myFun() - Nested Field Accessโ
const myFun = (key, data, val) => {
if (!val) val = data[key.trim()];
else val = val[key.trim()];
return val;
};
Purpose: Safely access nested object properties using dot notation.
Example:
// CSV has: company.address.street = "123 Main St"
let val;
val = myFun('company', csvRow, val); // Gets company object
val = myFun('address', csvRow, val); // Gets address object
val = myFun('street', csvRow, val); // Gets "123 Main St"
๐ Data Structuresโ
Queue Collection Documentโ
{
_id: ObjectId,
status: 'pending', // pending, processing, completed, failed
source: 'contacts',
in_progress: false, // Prevents duplicate processing
type: 'contacts', // 'contacts', 'companies', 'both'
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
client_id: ObjectId,
csv: [{
key: 's3-bucket-key.csv' // Wasabi S3 key
}],
mappings: {
'first_name': 'First Name', // CSV column โ DB field
'email': 'Email Address',
// ... more mappings
},
additional_data: {
tags: ['imported', 'leads']
},
logid: ObjectId, // Links to import result log
createdAt: Date,
updatedAt: Date
}
Contact Object (Person)โ
{
_id: ObjectId,
name: 'John Doe',
first_name: 'John',
last_name: 'Doe',
email: 'john@example.com',
phone: '+1234567890',
address: {
street: '123 Main St',
city: 'New York',
state: 'NY',
zip: '10001',
country: 'USA'
},
social: {
linkedin: 'linkedin.com/in/johndoe',
twitter: '@johndoe'
},
lead_status: 'new_lead',
import_id: ObjectId,
type: 'person',
parent_account: ObjectId,
created_by: ObjectId,
owner: ObjectId,
provider: 'csv',
tags: ['imported'],
businesses: [ObjectId], // Linked businesses
createdAt: Date,
updatedAt: Date
}
โ๏ธ Configurationโ
Required Environment Variablesโ
# Authentication
APP_SECRET=your-jwt-secret
# S3 Storage (Wasabi)
WASABI_ACCESS_KEY=...
WASABI_SECRET_KEY=...
WASABI_BUCKET=...
WASABI_REGION=...
# Database
MONGO_DB_URL=mongodb://...
# Redis (for Bull queues)
REDIS_HOST=localhost
REDIS_PORT=6379
Import Job Optionsโ
{
attempts: 3, // Retry failed jobs 3 times
backoff: 4000, // 4 second delay between retries
removeOnComplete: true, // Auto-cleanup successful jobs
removeOnFail: false // Keep failed jobs for inspection
}
๐จ Error Handlingโ
Top-Level Error Handlingโ
try {
// Import processing
} catch (error) {
console.log(error);
// Error logged but job not marked as failed
// Allows retry on next cron run
}
Row-Level Error Handlingโ
Handled in queue processors (not service layer):
- Invalid email format
- Missing required fields
- Duplicate detection
- Database constraint violations
Error Recoveryโ
- Job Retry: Failed jobs retry 3 times with backoff
- Partial Success: Some rows can succeed while others fail
- Error Logging: Errors stored with line numbers
- Queue Cleanup: Failed jobs preserved for analysis
๐ Performance Considerationsโ
Optimization Strategiesโ
- Streaming: CSV streamed from S3, not loaded into memory
- Batch Processing: One job per CSV row (Bull handles concurrency)
- Parallel Processing: Multiple import jobs processed simultaneously
- Connection Pooling: Reuses MongoDB connections
- Queue Isolation: Separate queues per import type
Scalabilityโ
- Horizontal Scaling: Multiple Queue Manager instances
- Queue Distribution: Bull distributes jobs across workers
- Rate Limiting: Controlled by Bull concurrency settings
- Memory Efficient: Streaming prevents memory bloat
Typical Performanceโ
- Small Import (< 100 rows): 5-10 seconds
- Medium Import (100-1000 rows): 30-60 seconds
- Large Import (1000-10000 rows): 5-15 minutes
- Very Large (10000+ rows): 30+ minutes
๐งช Testing Considerationsโ
Test Scenariosโ
- Person Import: CSV with person data only
- Business Import: CSV with company data only
- Both Import: CSV with linked person-company data
- Name Variations: Test name splitting logic
- Empty Fields: Test field cleanup (empty addresses)
- Special Characters: Test encoding and escaping
- Duplicate Handling: Test duplicate detection
- Error Recovery: Test job retry logic
Test Dataโ
First Name,Last Name,Email,Phone,Company Name
John,Doe,john@example.com,1234567890,Acme Corp
Jane,Smith,jane@example.com,0987654321,Tech Inc
๐ Related Documentationโ
- Contacts Module Overview
- Common Utilities - add_data.js
- Queue Wrapper
- Wasabi S3 Utilities (link removed - file does not exist) (if documented)
Complexity: Very High (450+ lines)
Business Impact: Critical - Customer data management
Performance: Streaming, batch processing
Last Updated: 2025-10-10