📤 Contact Export Processing
📖 Overview
The Contact Export module handles CSV file generation for contacts and companies. It creates authenticated export jobs, supports multi-currency formatting, and processes exports through dedicated Bull queues for person and business records.
Cron Schedule: Configured in crons/contacts/export.js
Source Files:
- Cron:
queue-manager/crons/contacts/export.js - Service:
queue-manager/services/contacts/export.js(~90 lines) - Queues:
queue-manager/queues/contacts/person-export.js(Person exports)queue-manager/queues/contacts/business-export.js(Company exports)
🎯 Business Purpose
Enables users to:
- Export Contact Lists: Download complete contact database as CSV
- Export Company Lists: Download business contacts separately
- Multi-Currency Support: Format currency fields per account currency
- Data Portability: Enable migration to other CRM systems
- Backup & Analysis: Allow offline data analysis and backups
🔄 Complete Processing Flow
sequenceDiagram
participant USER as User/Internal API
participant QUEUE_COLL as Queue Collection
participant CRON as Export Cron
participant SERVICE as Export Service
participant ACCOUNT as Account Model
participant JWT as JWT Generator
participant BULL as Bull Queues
participant PROCESSOR as Queue Processor
participant WASABI as Wasabi S3
USER->>QUEUE_COLL: Create export job<br/>(status: pending)
loop Cron Schedule
CRON->>SERVICE: Execute exportContacts()
SERVICE->>QUEUE_COLL: Query pending exports<br/>(in_progress: false)
QUEUE_COLL-->>SERVICE: Export jobs
SERVICE->>QUEUE_COLL: Mark in_progress: true
SERVICE->>ACCOUNT: Fetch account + currency
ACCOUNT-->>SERVICE: Account with business.currency
SERVICE->>JWT: Generate access token<br/>(1hr expiration)
JWT-->>SERVICE: JWT token
alt Export Type: contacts (Person)
SERVICE->>BULL: Create Person Export Queue
SERVICE->>BULL: Add export job
BULL->>PROCESSOR: Process person export
PROCESSOR->>WASABI: Upload CSV to S3
else Export Type: companies (Business)
SERVICE->>BULL: Create Business Export Queue
SERVICE->>BULL: Add export job
BULL->>PROCESSOR: Process business export
PROCESSOR->>WASABI: Upload CSV to S3
end
end
🔧 Main Service Function
exportContacts()
Purpose: Orchestrates CSV export job creation from queue polling to Bull queue job addition.
Complete Source Code
const jwt = require('jsonwebtoken');
const Account = require('../../models/account');
const Queue = require('../../models/queues');
const personQueue = require('../../queues/contacts/person-export');
const businessQueue = require('../../queues/contacts/business-export');
const logger = require('../../utilities/logger');
exports.exportContacts = async () => {
try {
// Step 1: Query pending exports
let exportData = await Queue.find({
status: 'pending',
source: 'contacts-export',
in_progress: false,
}).sort({ createdAt: -1 });
// Step 2: Mark as in progress
const ids = exportData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
if (exportData.length) {
// Step 3: Process each export job
await Promise.all(
exportData.map(async exportParams => {
exportParams = exportParams.toObject();
exportParams.accountId = exportParams.account_id.toString();
let type = exportParams.type;
// Step 4: Fetch account with currency
let acc = await Account.findById(exportParams.accountId).populate({
path: 'business',
select: 'currency',
});
exportParams.ownerId = exportParams.user_id.toString();
exportParams.accountCurrency = acc?.business?.currency?.code;
// Step 5: Generate JWT token
exportParams.authToken = jwt.sign(
{
type: 'access_token',
uid: exportParams.user_id.toString(),
account_id: exportParams.account_id.toString(),
parent_account: exportParams.parent_account.toString(),
client_id: exportParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
let accessToken = exportParams.authToken;
let accountId = exportParams.accountId;
let ownerId = exportParams.ownerId;
// Step 6: Route to appropriate queue
if (type === 'contacts') {
let Q = await personQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
if (type === 'companies') {
let Q = await businessQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
}),
);
console.log('export Started');
}
} catch (error) {
logger.error({ error, initiator: 'QM/contacts/contact-export' });
}
};
📋 Step-by-Step Logic
Step 1: Query Pending Exports
let exportData = await Queue.find({
status: 'pending',
source: 'contacts-export',
in_progress: false,
}).sort({ createdAt: -1 });
Query Logic:
- status: 'pending': Only new export jobs
- source: 'contacts-export': Filters to contact exports (vs other job types)
- in_progress: false: Prevents duplicate processing
- Sort by createdAt descending: Processes newest first
Why Newest First?:
- More likely to be what user is waiting for
- Recent exports reflect current data state
- Better user experience (less waiting)
Step 2: Mark In Progress
const ids = exportData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
Concurrency Safety:
- Immediately marks all found jobs as in progress
- Prevents multiple Queue Manager instances from processing same jobs
- Bulk update for efficiency
- Critical for horizontal scaling
Step 3: Parallel Processing
await Promise.all(
exportData.map(async exportParams => {
// Process each export
}),
);
Parallel Execution:
- Processes multiple exports simultaneously
- Each export gets its own Promise
- All Promises must complete before function returns
- Faster than sequential processing
Step 4: Fetch Account with Currency
let acc = await Account.findById(exportParams.accountId).populate({
path: 'business',
select: 'currency',
});
exportParams.accountCurrency = acc?.business?.currency?.code;
Currency Support Logic:
- Fetches account document
- Populates
businessrelationship - Selects only
currencyfield (optimization) - Extracts currency code (e.g.,
'USD','EUR','GBP') - Used by processor to format monetary values in CSV
Example Currency Codes:
- USD → $1,234.56
- EUR → €1.234,56
- GBP → £1,234.56
- INR → ₹1,23,456.78
Step 5: Generate JWT Token
exportParams.authToken = jwt.sign(
{
type: 'access_token',
uid: exportParams.user_id.toString(),
account_id: exportParams.account_id.toString(),
parent_account: exportParams.parent_account.toString(),
client_id: exportParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
Token Purpose:
- Authenticates queue processor's API calls
- Short-lived (1 hour) for security
- Includes all necessary permissions in scope
- Allows processor to fetch contact data and upload to S3
Scopes Required:
- contacts: Basic contact operations
- communications: Access to communication history
- contacts.external: External integration data
- contacts.read: Read contact details
- contacts.create: Not used but included
- files: File operations
- files.create: Upload CSV to storage
Step 6: Route to Appropriate Queue
Person Export Queue
if (type === 'contacts') {
let Q = await personQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
Job Payload:
- id: Export job ID (for tracking)
- account_id: Account being exported
- owner: User who initiated export
- exportParams: Original export parameters (filters, columns, etc.)
- accessToken: JWT for authentication
- account: Full account object with currency
Job Options:
- attempts: 3: Retry failed jobs up to 3 times
- backoff: 4000: Wait 4 seconds between retries (exponential backoff handled by Bull)
Business Export Queue
if (type === 'companies') {
let Q = await businessQueue.start(exportParams._id.toString());
await Q.add(
{
// Same payload structure
},
{
attempts: 3,
backoff: 4000,
},
);
}
Separate Queue Rationale:
- Different data schemas (person vs business)
- Different CSV column structure
- Separate concurrency limits
- Independent error handling
📊 Data Structures
Queue Collection Document
{
_id: ObjectId,
status: 'pending', // pending, processing, completed, failed
source: 'contacts-export',
in_progress: false, // Prevents duplicate processing
type: 'contacts', // 'contacts' or 'companies'
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
client_id: ObjectId,
filters: { // Optional export filters
tags: ['customer', 'lead'],
lead_status: ['new_lead'],
date_range: {
start: Date,
end: Date
}
},
columns: [ // Columns to include in CSV
'first_name',
'last_name',
'email',
'phone',
'address.city',
'address.state'
],
createdAt: Date,
updatedAt: Date
}
Account Object (Populated)
{
_id: ObjectId,
name: 'Acme Corp',
business: {
_id: ObjectId,
currency: {
code: 'USD',
symbol: '$',
name: 'US Dollar'
}
},
// ... other account fields
}
Export Job Payload (Sent to Queue)
{
id: '507f1f77bcf86cd799439011',
account_id: '507f191e810c19729de860ea',
owner: '507f191e810c19729de860eb',
exportParams: {
_id: ObjectId,
accountId: '507f191e810c19729de860ea',
ownerId: '507f191e810c19729de860eb',
accountCurrency: 'USD',
authToken: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
type: 'contacts',
filters: { ... },
columns: [ ... ]
},
accessToken: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
account: {
_id: '507f191e810c19729de860ea',
name: 'Acme Corp',
business: {
currency: {
code: 'USD',
symbol: '$'
}
}
}
}
🎨 Usage Patterns
Typical Export Flow
// 1. User clicks "Export Contacts" in UI
// 2. Frontend sends request to Internal API
// 3. Internal API creates queue entry
const Queue = require('../models/queues');
const exportJob = await Queue.create({
status: 'pending',
source: 'contacts-export',
type: 'contacts',
account_id: req.user.account_id,
user_id: req.user._id,
parent_account: req.user.parent_account,
client_id: req.user.client_id,
filters: req.body.filters, // From UI
columns: req.body.columns, // From UI
});
// 4. Queue Manager cron picks it up
// 5. Service creates Bull queue job
// 6. Processor generates CSV
// 7. User receives download link
Filtered Export Example
// Export only leads created in last 30 days
const exportJob = await Queue.create({
status: 'pending',
source: 'contacts-export',
type: 'contacts',
account_id: accountId,
user_id: userId,
parent_account: parentAccountId,
client_id: clientId,
filters: {
lead_status: ['new_lead', 'contacted'],
created_at: {
$gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
},
},
columns: ['first_name', 'last_name', 'email', 'phone', 'lead_status', 'created_at'],
});
⚙️ Configuration
Required Environment Variables
# JWT Secret
APP_SECRET=your-jwt-secret-key
# Database
MONGO_DB_URL=mongodb://...
# Redis (for Bull queues)
REDIS_HOST=localhost
REDIS_PORT=6379
# Wasabi S3 (for CSV storage)
WASABI_ACCESS_KEY=...
WASABI_SECRET_KEY=...
WASABI_BUCKET=...
WASABI_REGION=...
Queue Options
{
attempts: 3, // Retry failed jobs 3 times
backoff: 4000, // 4 second delay between retries
removeOnComplete: true, // Auto-cleanup successful jobs
removeOnFail: false // Keep failed jobs for inspection
}
🚨 Error Handling
Top-Level Error Handling
try {
// Export processing
} catch (error) {
logger.error({ error, initiator: 'QM/contacts/contact-export' });
}
Error Behavior:
- Logs error with context
- Does not rethrow (prevents cron failure)
- Jobs remain
in_progress(manual cleanup needed) - No user notification on service-level errors
Job-Level Error Handling
Handled in queue processors:
- Failed jobs retry 3 times with 4-second backoff
- After 3 failures, job marked as failed
- Failed jobs preserved for inspection
- Errors logged with queue entry ID
Recovery Strategies
-
Stuck Jobs: Jobs marked
in_progressbut no processor running- Reset
in_progress: falsemanually - Jobs will be picked up on next cron run
- Reset
-
Failed Jobs: Jobs failed after 3 retries
- Check Bull failed jobs dashboard
- Review error logs
- Fix underlying issue (API limits, storage, etc.)
- Manually replay job or recreate
📈 Performance Considerations
Optimization Strategies
- Parallel Processing: Multiple exports processed simultaneously
- Bulk Updates: Single update for all
in_progressflags - Selective Population: Only fetches
currencyfield, not full business object - Queue Isolation: Separate queues for person and business exports
Scalability
- Horizontal Scaling: Multiple Queue Manager instances can run concurrently
- Queue Distribution: Bull distributes jobs across workers
- Database Load: One query for pending exports, bulk update for flags
- Memory Efficient: Doesn't load contact data in service (only in processor)
Typical Performance
- Small Export (< 100 contacts): 5-10 seconds
- Medium Export (100-1000 contacts): 30-60 seconds
- Large Export (1000-10000 contacts): 2-5 minutes
- Very Large (10000+ contacts): 10+ minutes
Bottlenecks:
- Database query performance (contact fetching in processor)
- CSV generation and formatting
- S3 upload bandwidth
- Currency formatting calculations
🔗 Related Documentation
- Contacts Module Overview
- Contact Import Processing - Import counterpart
- Export Cleanup - Cleanup old exports
- Common Utilities - generate-queue
📝 Notes
Export vs Import
- Export: Queue Manager generates CSV from database
- Import: Queue Manager processes CSV into database
- Both use JWT authentication
- Both support person and business types
- Export is read-only, import is write-heavy
Currency Formatting
The accountCurrency field enables proper currency formatting in CSV:
- Amounts formatted with correct symbol and decimal places
- Thousand separators per locale
- Used for deal values, invoice amounts, etc.
Authentication Scope
The JWT token includes comprehensive scopes to allow processor to:
- Read contact data from database
- Access related communication records
- Upload CSV file to S3 storage
- Not used for creating/updating contacts
Complexity: Medium
Business Impact: High - Critical for data portability
Dependencies: JWT, Account model, Bull queues
Last Updated: 2025-10-10