📇 Contacts Processing
📖 Overview
The Contacts module handles bulk contact operations including CSV imports, exports, and cleanup of expired export files.
Environment Flags:
QM_CONTACTS=true- Contact import processingQM_CONTACTS_EXPORT=true- Contact export processingQM_CONTACTS_EXPORT_CLEANUP=true- Export file cleanup
Source Location: queue-manager/crons/contacts/
Processing Pattern: High-frequency polling (every 5-30 seconds)
🗄️ Collections Used
contact-import-results
- Operations: Read/Write
- Model:
shared/models/contact-import-results.js - Usage: Track import job status and results
contact-queue
- Operations: Read/Write/Delete
- Model:
shared/models/contact-queue.js - Usage: Queue contact import/export jobs
contacts
- Operations: Create/Update
- Model:
shared/models/contact.js - Usage: Store imported contact data
🔄 Processing Flow
graph TB
API[Internal API] -->|Create Job| QUEUE_COLL[contact-queue]
CRON_IMP[Import Cron<br/>Every 5s] -->|Query pending| QUEUE_COLL
CRON_EXP[Export Cron<br/>Every 30s] -->|Query pending| QUEUE_COLL
CRON_CLN[Cleanup Cron<br/>Daily] -->|Query old files| FILES[Export Files]
QUEUE_COLL -->|Pending Jobs| IMP_SVC[Import Service]
QUEUE_COLL -->|Pending Jobs| EXP_SVC[Export Service]
IMP_SVC -->|Parse & Validate| CSV[CSV Data]
IMP_SVC -->|Insert| DB[(contacts)]
IMP_SVC -->|Update Status| QUEUE_COLL
EXP_SVC -->|Query Contacts| DB
EXP_SVC -->|Generate CSV| FILES
EXP_SVC -->|Update Status| QUEUE_COLL
CRON_CLN -->|Delete Old| FILES
🔧 Jobs in This Module
Core Jobs
- 📘 Import - Contact CSV import processing
- 📗 Export - Contact CSV export generation
- 📙 Cleanup - Export file cleanup
⚙️ Configuration
Required Environment Variables:
QM_CONTACTS=true # Enable contact import
QM_CONTACTS_EXPORT=true # Enable contact export
QM_CONTACTS_EXPORT_CLEANUP=true # Enable export cleanup
MONGO_DB_URL=... # MongoDB connection
REDIS_HOST=localhost # Redis for Bull queues
REDIS_PORT=6379
📘 Import Processing
Overview
Processes bulk contact imports from CSV files uploaded through the Internal API.
Cron Schedule: */5 * * * * * (Every 5 seconds)
Source Files:
- Cron:
queue-manager/crons/contacts/import.js - Service:
queue-manager/services/contacts/import.js
Processing Flow
sequenceDiagram
participant API as Internal API
participant QUEUE as contact-queue
participant CRON as Import Cron
participant SERVICE as Import Service
participant DB as MongoDB
API->>QUEUE: Create import job<br/>(status: 'pending')
loop Every 5 seconds
CRON->>SERVICE: Execute
SERVICE->>QUEUE: Query pending imports
QUEUE-->>SERVICE: Pending jobs
SERVICE->>SERVICE: Download & parse CSV
SERVICE->>SERVICE: Validate contacts
SERVICE->>DB: Bulk insert contacts
SERVICE->>QUEUE: Update status<br/>(status: 'completed')
end
Key Features
- High-Frequency Polling: Checks every 5 seconds for new imports
- CSV Parsing: Supports multiple CSV formats and encodings
- Validation: Email validation, duplicate detection
- Bulk Operations: Efficient bulk inserts
- Error Handling: Partial success tracking (successful vs failed rows)
- Status Tracking: Real-time progress updates
Import Job Lifecycle
- Created: Internal API creates
contact-queuedocument - Pending: Job queued waiting for processing
- Processing: Cron picks up job and starts import
- Validation: CSV parsed and contacts validated
- Import: Bulk insert into
contactscollection - Completed/Failed: Final status with stats (imported count, failed count)
Code Pattern
Cron Initialization (crons/contacts/import.js):
const { importContacts } = require('../../services/contacts/import');
const cron = require('node-cron');
let inProgress = false;
exports.start = async () => {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importContacts();
inProgress = false;
}
});
};
Service Logic (services/contacts/import.js):
- Queries
contact-queueforstatus: 'pending',type: 'import' - Downloads CSV from S3/storage
- Parses CSV with error handling
- Validates email addresses and required fields
- Bulk inserts contacts with duplicate handling
- Updates job status and statistics
Validation Rules
- Email Format: Valid email regex pattern
- Required Fields: Name, email (minimum)
- Duplicate Detection: By email per account
- Field Length: Max lengths for name, company, etc.
- Custom Fields: Dynamic validation based on account settings
📗 Export Processing
Overview
Generates CSV exports of contacts based on filters and search criteria.
Cron Schedule: */30 * * * * * (Every 30 seconds)
Source Files:
- Cron:
queue-manager/crons/contacts/export.js - Service:
queue-manager/services/contacts/export.js
Processing Flow
sequenceDiagram
participant API as Internal API
participant QUEUE as contact-queue
participant CRON as Export Cron
participant SERVICE as Export Service
participant DB as contacts
participant S3 as File Storage
API->>QUEUE: Create export job<br/>(filters, status: 'pending')
loop Every 30 seconds
CRON->>SERVICE: Execute
SERVICE->>QUEUE: Query pending exports
QUEUE-->>SERVICE: Pending jobs
SERVICE->>DB: Query contacts with filters
DB-->>SERVICE: Contact data
SERVICE->>SERVICE: Generate CSV
SERVICE->>S3: Upload CSV file
S3-->>SERVICE: File URL
SERVICE->>QUEUE: Update status<br/>(fileUrl, status: 'completed')
end
Key Features
- Filter Support: Export by tags, segments, date ranges
- Column Selection: Configurable CSV columns
- Large Datasets: Streaming for memory efficiency
- File Storage: S3/cloud storage integration
- Download Links: Temporary signed URLs
- Progress Tracking: Row count and status updates
Export Options
Filters:
- Tags (include/exclude)
- Date ranges (created, updated)
- Contact status (active, unsubscribed)
- Custom field values
- Search queries
Format Options:
- Column selection
- Header inclusion
- CSV delimiter (comma, semicolon, tab)
- Encoding (UTF-8, Latin-1)
Code Pattern
Cron Initialization (crons/contacts/export.js):
const { exportContacts } = require('../../services/contacts/export');
const cron = require('node-cron');
let inProgress = false;
exports.start = async () => {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await exportContacts();
inProgress = false;
}
});
};
Service Logic (services/contacts/export.js):
- Queries
contact-queueforstatus: 'pending',type: 'export' - Applies filters from job data
- Streams contacts to CSV
- Uploads to S3 with expiration policy
- Updates job with download URL
📙 Export Cleanup
Overview
Automatically deletes old export files to free storage space.
Cron Schedule: 0 4 * * * (Daily at 4:00 AM)
Source Files:
- Cron:
queue-manager/crons/contacts/cleanup.js - Service:
queue-manager/services/contacts/cleanup.js
Processing Flow
sequenceDiagram
participant CRON as Cleanup Cron
participant SERVICE as Cleanup Service
participant QUEUE as contact-queue
participant S3 as File Storage
CRON->>SERVICE: Execute daily
SERVICE->>QUEUE: Query old export jobs
QUEUE-->>SERVICE: Jobs older than 7 days
SERVICE->>S3: Delete files
S3-->>SERVICE: Deleted
SERVICE->>QUEUE: Update status<br/>(fileDeleted: true)
Key Features
- Automatic Cleanup: Runs daily
- Configurable Retention: Default 7 days
- Storage Optimization: Frees S3 storage
- Audit Trail: Logs deleted files
- Graceful Failure: Continues on individual file errors
Cleanup Policy
Files Deleted:
- Export files older than retention period (default: 7 days)
- Completed or failed export jobs
- Files marked as expired
Files Preserved:
- Recent exports (within retention period)
- Export jobs still processing
- Files marked as permanent
Code Pattern
Cron Initialization (crons/contacts/cleanup.js):
const { cleanupExports } = require('../../services/contacts/cleanup');
const cron = require('node-cron');
let inProgress = false;
exports.start = async () => {
cron.schedule('0 4 * * *', async () => {
if (!inProgress) {
inProgress = true;
await cleanupExports();
inProgress = false;
}
});
};
Service Logic (services/contacts/cleanup.js):
- Queries
contact-queuefor exports older than 7 days - Filters completed/failed jobs with files
- Deletes S3 objects
- Updates job documents to mark files as deleted
- Logs cleanup statistics
🔗 Related Documentation
- Queue Manager Overview
- Architecture (documentation unavailable)
- Configuration
Module Status: Active
Execution Pattern: High-frequency polling + Daily cleanup
Last Updated: 2025-10-10