Skip to main content

📤 Contact Export Processing

📖 Overview

The Contact Export module handles CSV file generation for contacts and companies. It creates authenticated export jobs, supports multi-currency formatting, and processes exports through dedicated Bull queues for person and business records.

Cron Schedule: Configured in crons/contacts/export.js

Source Files:

  • Cron: queue-manager/crons/contacts/export.js
  • Service: queue-manager/services/contacts/export.js (~90 lines)
  • Queues:
    • queue-manager/queues/contacts/person-export.js (Person exports)
    • queue-manager/queues/contacts/business-export.js (Company exports)

🎯 Business Purpose

Enables users to:

  • Export Contact Lists: Download complete contact database as CSV
  • Export Company Lists: Download business contacts separately
  • Multi-Currency Support: Format currency fields per account currency
  • Data Portability: Enable migration to other CRM systems
  • Backup & Analysis: Allow offline data analysis and backups

🔄 Complete Processing Flow

sequenceDiagram
participant USER as User/Internal API
participant QUEUE_COLL as Queue Collection
participant CRON as Export Cron
participant SERVICE as Export Service
participant ACCOUNT as Account Model
participant JWT as JWT Generator
participant BULL as Bull Queues
participant PROCESSOR as Queue Processor
participant WASABI as Wasabi S3

USER->>QUEUE_COLL: Create export job<br/>(status: pending)

loop Cron Schedule
CRON->>SERVICE: Execute exportContacts()
SERVICE->>QUEUE_COLL: Query pending exports<br/>(in_progress: false)
QUEUE_COLL-->>SERVICE: Export jobs
SERVICE->>QUEUE_COLL: Mark in_progress: true

SERVICE->>ACCOUNT: Fetch account + currency
ACCOUNT-->>SERVICE: Account with business.currency

SERVICE->>JWT: Generate access token<br/>(1hr expiration)
JWT-->>SERVICE: JWT token

alt Export Type: contacts (Person)
SERVICE->>BULL: Create Person Export Queue
SERVICE->>BULL: Add export job
BULL->>PROCESSOR: Process person export
PROCESSOR->>WASABI: Upload CSV to S3
else Export Type: companies (Business)
SERVICE->>BULL: Create Business Export Queue
SERVICE->>BULL: Add export job
BULL->>PROCESSOR: Process business export
PROCESSOR->>WASABI: Upload CSV to S3
end
end

🔧 Main Service Function

exportContacts()

Purpose: Orchestrates CSV export job creation from queue polling to Bull queue job addition.

Complete Source Code

const jwt = require('jsonwebtoken');
const Account = require('../../models/account');
const Queue = require('../../models/queues');
const personQueue = require('../../queues/contacts/person-export');
const businessQueue = require('../../queues/contacts/business-export');
const logger = require('../../utilities/logger');

exports.exportContacts = async () => {
try {
// Step 1: Query pending exports
let exportData = await Queue.find({
status: 'pending',
source: 'contacts-export',
in_progress: false,
}).sort({ createdAt: -1 });

// Step 2: Mark as in progress
const ids = exportData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

if (exportData.length) {
// Step 3: Process each export job
await Promise.all(
exportData.map(async exportParams => {
exportParams = exportParams.toObject();
exportParams.accountId = exportParams.account_id.toString();
let type = exportParams.type;

// Step 4: Fetch account with currency
let acc = await Account.findById(exportParams.accountId).populate({
path: 'business',
select: 'currency',
});

exportParams.ownerId = exportParams.user_id.toString();
exportParams.accountCurrency = acc?.business?.currency?.code;

// Step 5: Generate JWT token
exportParams.authToken = jwt.sign(
{
type: 'access_token',
uid: exportParams.user_id.toString(),
account_id: exportParams.account_id.toString(),
parent_account: exportParams.parent_account.toString(),
client_id: exportParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

let accessToken = exportParams.authToken;
let accountId = exportParams.accountId;
let ownerId = exportParams.ownerId;

// Step 6: Route to appropriate queue
if (type === 'contacts') {
let Q = await personQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
if (type === 'companies') {
let Q = await businessQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}
}),
);
console.log('export Started');
}
} catch (error) {
logger.error({ error, initiator: 'QM/contacts/contact-export' });
}
};

📋 Step-by-Step Logic

Step 1: Query Pending Exports

let exportData = await Queue.find({
status: 'pending',
source: 'contacts-export',
in_progress: false,
}).sort({ createdAt: -1 });

Query Logic:

  • status: 'pending': Only new export jobs
  • source: 'contacts-export': Filters to contact exports (vs other job types)
  • in_progress: false: Prevents duplicate processing
  • Sort by createdAt descending: Processes newest first

Why Newest First?:

  • More likely to be what user is waiting for
  • Recent exports reflect current data state
  • Better user experience (less waiting)

Step 2: Mark In Progress

const ids = exportData.map(d => d._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

Concurrency Safety:

  • Immediately marks all found jobs as in progress
  • Prevents multiple Queue Manager instances from processing same jobs
  • Bulk update for efficiency
  • Critical for horizontal scaling

Step 3: Parallel Processing

await Promise.all(
exportData.map(async exportParams => {
// Process each export
}),
);

Parallel Execution:

  • Processes multiple exports simultaneously
  • Each export gets its own Promise
  • All Promises must complete before function returns
  • Faster than sequential processing

Step 4: Fetch Account with Currency

let acc = await Account.findById(exportParams.accountId).populate({
path: 'business',
select: 'currency',
});

exportParams.accountCurrency = acc?.business?.currency?.code;

Currency Support Logic:

  • Fetches account document
  • Populates business relationship
  • Selects only currency field (optimization)
  • Extracts currency code (e.g., 'USD', 'EUR', 'GBP')
  • Used by processor to format monetary values in CSV

Example Currency Codes:

  • USD → $1,234.56
  • EUR → €1.234,56
  • GBP → £1,234.56
  • INR → ₹1,23,456.78

Step 5: Generate JWT Token

exportParams.authToken = jwt.sign(
{
type: 'access_token',
uid: exportParams.user_id.toString(),
account_id: exportParams.account_id.toString(),
parent_account: exportParams.parent_account.toString(),
client_id: exportParams.client_id.toString(),
scope:
'contacts communications contacts.external contacts.read contacts.create files files.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

Token Purpose:

  • Authenticates queue processor's API calls
  • Short-lived (1 hour) for security
  • Includes all necessary permissions in scope
  • Allows processor to fetch contact data and upload to S3

Scopes Required:

  • contacts: Basic contact operations
  • communications: Access to communication history
  • contacts.external: External integration data
  • contacts.read: Read contact details
  • contacts.create: Not used but included
  • files: File operations
  • files.create: Upload CSV to storage

Step 6: Route to Appropriate Queue

Person Export Queue

if (type === 'contacts') {
let Q = await personQueue.start(exportParams._id.toString());
await Q.add(
{
id: exportParams._id.toString(),
account_id: accountId,
owner: ownerId,
exportParams,
accessToken,
account: acc.toJSON(),
},
{
attempts: 3,
backoff: 4000,
},
);
}

Job Payload:

  • id: Export job ID (for tracking)
  • account_id: Account being exported
  • owner: User who initiated export
  • exportParams: Original export parameters (filters, columns, etc.)
  • accessToken: JWT for authentication
  • account: Full account object with currency

Job Options:

  • attempts: 3: Retry failed jobs up to 3 times
  • backoff: 4000: Wait 4 seconds between retries (exponential backoff handled by Bull)

Business Export Queue

if (type === 'companies') {
let Q = await businessQueue.start(exportParams._id.toString());
await Q.add(
{
// Same payload structure
},
{
attempts: 3,
backoff: 4000,
},
);
}

Separate Queue Rationale:

  • Different data schemas (person vs business)
  • Different CSV column structure
  • Separate concurrency limits
  • Independent error handling

📊 Data Structures

Queue Collection Document

{
_id: ObjectId,
status: 'pending', // pending, processing, completed, failed
source: 'contacts-export',
in_progress: false, // Prevents duplicate processing
type: 'contacts', // 'contacts' or 'companies'
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
client_id: ObjectId,
filters: { // Optional export filters
tags: ['customer', 'lead'],
lead_status: ['new_lead'],
date_range: {
start: Date,
end: Date
}
},
columns: [ // Columns to include in CSV
'first_name',
'last_name',
'email',
'phone',
'address.city',
'address.state'
],
createdAt: Date,
updatedAt: Date
}

Account Object (Populated)

{
_id: ObjectId,
name: 'Acme Corp',
business: {
_id: ObjectId,
currency: {
code: 'USD',
symbol: '$',
name: 'US Dollar'
}
},
// ... other account fields
}

Export Job Payload (Sent to Queue)

{
id: '507f1f77bcf86cd799439011',
account_id: '507f191e810c19729de860ea',
owner: '507f191e810c19729de860eb',
exportParams: {
_id: ObjectId,
accountId: '507f191e810c19729de860ea',
ownerId: '507f191e810c19729de860eb',
accountCurrency: 'USD',
authToken: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
type: 'contacts',
filters: { ... },
columns: [ ... ]
},
accessToken: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
account: {
_id: '507f191e810c19729de860ea',
name: 'Acme Corp',
business: {
currency: {
code: 'USD',
symbol: '$'
}
}
}
}

🎨 Usage Patterns

Typical Export Flow

// 1. User clicks "Export Contacts" in UI
// 2. Frontend sends request to Internal API
// 3. Internal API creates queue entry
const Queue = require('../models/queues');

const exportJob = await Queue.create({
status: 'pending',
source: 'contacts-export',
type: 'contacts',
account_id: req.user.account_id,
user_id: req.user._id,
parent_account: req.user.parent_account,
client_id: req.user.client_id,
filters: req.body.filters, // From UI
columns: req.body.columns, // From UI
});

// 4. Queue Manager cron picks it up
// 5. Service creates Bull queue job
// 6. Processor generates CSV
// 7. User receives download link

Filtered Export Example

// Export only leads created in last 30 days
const exportJob = await Queue.create({
status: 'pending',
source: 'contacts-export',
type: 'contacts',
account_id: accountId,
user_id: userId,
parent_account: parentAccountId,
client_id: clientId,
filters: {
lead_status: ['new_lead', 'contacted'],
created_at: {
$gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
},
},
columns: ['first_name', 'last_name', 'email', 'phone', 'lead_status', 'created_at'],
});

⚙️ Configuration

Required Environment Variables

# JWT Secret
APP_SECRET=your-jwt-secret-key

# Database
MONGO_DB_URL=mongodb://...

# Redis (for Bull queues)
REDIS_HOST=localhost
REDIS_PORT=6379

# Wasabi S3 (for CSV storage)
WASABI_ACCESS_KEY=...
WASABI_SECRET_KEY=...
WASABI_BUCKET=...
WASABI_REGION=...

Queue Options

{
attempts: 3, // Retry failed jobs 3 times
backoff: 4000, // 4 second delay between retries
removeOnComplete: true, // Auto-cleanup successful jobs
removeOnFail: false // Keep failed jobs for inspection
}

🚨 Error Handling

Top-Level Error Handling

try {
// Export processing
} catch (error) {
logger.error({ error, initiator: 'QM/contacts/contact-export' });
}

Error Behavior:

  • Logs error with context
  • Does not rethrow (prevents cron failure)
  • Jobs remain in_progress (manual cleanup needed)
  • No user notification on service-level errors

Job-Level Error Handling

Handled in queue processors:

  • Failed jobs retry 3 times with 4-second backoff
  • After 3 failures, job marked as failed
  • Failed jobs preserved for inspection
  • Errors logged with queue entry ID

Recovery Strategies

  1. Stuck Jobs: Jobs marked in_progress but no processor running

    • Reset in_progress: false manually
    • Jobs will be picked up on next cron run
  2. Failed Jobs: Jobs failed after 3 retries

    • Check Bull failed jobs dashboard
    • Review error logs
    • Fix underlying issue (API limits, storage, etc.)
    • Manually replay job or recreate

📈 Performance Considerations

Optimization Strategies

  1. Parallel Processing: Multiple exports processed simultaneously
  2. Bulk Updates: Single update for all in_progress flags
  3. Selective Population: Only fetches currency field, not full business object
  4. Queue Isolation: Separate queues for person and business exports

Scalability

  • Horizontal Scaling: Multiple Queue Manager instances can run concurrently
  • Queue Distribution: Bull distributes jobs across workers
  • Database Load: One query for pending exports, bulk update for flags
  • Memory Efficient: Doesn't load contact data in service (only in processor)

Typical Performance

  • Small Export (< 100 contacts): 5-10 seconds
  • Medium Export (100-1000 contacts): 30-60 seconds
  • Large Export (1000-10000 contacts): 2-5 minutes
  • Very Large (10000+ contacts): 10+ minutes

Bottlenecks:

  • Database query performance (contact fetching in processor)
  • CSV generation and formatting
  • S3 upload bandwidth
  • Currency formatting calculations

📝 Notes

Export vs Import

  • Export: Queue Manager generates CSV from database
  • Import: Queue Manager processes CSV into database
  • Both use JWT authentication
  • Both support person and business types
  • Export is read-only, import is write-heavy

Currency Formatting

The accountCurrency field enables proper currency formatting in CSV:

  • Amounts formatted with correct symbol and decimal places
  • Thousand separators per locale
  • Used for deal values, invoice amounts, etc.

Authentication Scope

The JWT token includes comprehensive scopes to allow processor to:

  • Read contact data from database
  • Access related communication records
  • Upload CSV file to S3 storage
  • Not used for creating/updating contacts

Complexity: Medium
Business Impact: High - Critical for data portability
Dependencies: JWT, Account model, Bull queues
Last Updated: 2025-10-10

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM