Skip to main content

📇 Contacts Processing

📖 Overview

The Contacts module handles bulk contact operations including CSV imports, exports, and cleanup of expired export files.

Environment Flags:

  • QM_CONTACTS=true - Contact import processing
  • QM_CONTACTS_EXPORT=true - Contact export processing
  • QM_CONTACTS_EXPORT_CLEANUP=true - Export file cleanup

Source Location: queue-manager/crons/contacts/

Processing Pattern: High-frequency polling (every 5-30 seconds)

🗄️ Collections Used

contact-import-results

  • Operations: Read/Write
  • Model: shared/models/contact-import-results.js
  • Usage: Track import job status and results

contact-queue

  • Operations: Read/Write/Delete
  • Model: shared/models/contact-queue.js
  • Usage: Queue contact import/export jobs

contacts

  • Operations: Create/Update
  • Model: shared/models/contact.js
  • Usage: Store imported contact data

🔄 Processing Flow

graph TB
API[Internal API] -->|Create Job| QUEUE_COLL[contact-queue]

CRON_IMP[Import Cron<br/>Every 5s] -->|Query pending| QUEUE_COLL
CRON_EXP[Export Cron<br/>Every 30s] -->|Query pending| QUEUE_COLL
CRON_CLN[Cleanup Cron<br/>Daily] -->|Query old files| FILES[Export Files]

QUEUE_COLL -->|Pending Jobs| IMP_SVC[Import Service]
QUEUE_COLL -->|Pending Jobs| EXP_SVC[Export Service]

IMP_SVC -->|Parse & Validate| CSV[CSV Data]
IMP_SVC -->|Insert| DB[(contacts)]
IMP_SVC -->|Update Status| QUEUE_COLL

EXP_SVC -->|Query Contacts| DB
EXP_SVC -->|Generate CSV| FILES
EXP_SVC -->|Update Status| QUEUE_COLL

CRON_CLN -->|Delete Old| FILES

🔧 Jobs in This Module

Core Jobs

  • 📘 Import - Contact CSV import processing
  • 📗 Export - Contact CSV export generation
  • 📙 Cleanup - Export file cleanup

⚙️ Configuration

Required Environment Variables:

QM_CONTACTS=true                    # Enable contact import
QM_CONTACTS_EXPORT=true # Enable contact export
QM_CONTACTS_EXPORT_CLEANUP=true # Enable export cleanup

MONGO_DB_URL=... # MongoDB connection
REDIS_HOST=localhost # Redis for Bull queues
REDIS_PORT=6379

📘 Import Processing

Overview

Processes bulk contact imports from CSV files uploaded through the Internal API.

Cron Schedule: */5 * * * * * (Every 5 seconds)

Source Files:

  • Cron: queue-manager/crons/contacts/import.js
  • Service: queue-manager/services/contacts/import.js

Processing Flow

sequenceDiagram
participant API as Internal API
participant QUEUE as contact-queue
participant CRON as Import Cron
participant SERVICE as Import Service
participant DB as MongoDB

API->>QUEUE: Create import job<br/>(status: 'pending')

loop Every 5 seconds
CRON->>SERVICE: Execute
SERVICE->>QUEUE: Query pending imports
QUEUE-->>SERVICE: Pending jobs
SERVICE->>SERVICE: Download & parse CSV
SERVICE->>SERVICE: Validate contacts
SERVICE->>DB: Bulk insert contacts
SERVICE->>QUEUE: Update status<br/>(status: 'completed')
end

Key Features

  • High-Frequency Polling: Checks every 5 seconds for new imports
  • CSV Parsing: Supports multiple CSV formats and encodings
  • Validation: Email validation, duplicate detection
  • Bulk Operations: Efficient bulk inserts
  • Error Handling: Partial success tracking (successful vs failed rows)
  • Status Tracking: Real-time progress updates

Import Job Lifecycle

  1. Created: Internal API creates contact-queue document
  2. Pending: Job queued waiting for processing
  3. Processing: Cron picks up job and starts import
  4. Validation: CSV parsed and contacts validated
  5. Import: Bulk insert into contacts collection
  6. Completed/Failed: Final status with stats (imported count, failed count)

Code Pattern

Cron Initialization (crons/contacts/import.js):

const { importContacts } = require('../../services/contacts/import');
const cron = require('node-cron');

let inProgress = false;
exports.start = async () => {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importContacts();
inProgress = false;
}
});
};

Service Logic (services/contacts/import.js):

  • Queries contact-queue for status: 'pending', type: 'import'
  • Downloads CSV from S3/storage
  • Parses CSV with error handling
  • Validates email addresses and required fields
  • Bulk inserts contacts with duplicate handling
  • Updates job status and statistics

Validation Rules

  • Email Format: Valid email regex pattern
  • Required Fields: Name, email (minimum)
  • Duplicate Detection: By email per account
  • Field Length: Max lengths for name, company, etc.
  • Custom Fields: Dynamic validation based on account settings

📗 Export Processing

Overview

Generates CSV exports of contacts based on filters and search criteria.

Cron Schedule: */30 * * * * * (Every 30 seconds)

Source Files:

  • Cron: queue-manager/crons/contacts/export.js
  • Service: queue-manager/services/contacts/export.js

Processing Flow

sequenceDiagram
participant API as Internal API
participant QUEUE as contact-queue
participant CRON as Export Cron
participant SERVICE as Export Service
participant DB as contacts
participant S3 as File Storage

API->>QUEUE: Create export job<br/>(filters, status: 'pending')

loop Every 30 seconds
CRON->>SERVICE: Execute
SERVICE->>QUEUE: Query pending exports
QUEUE-->>SERVICE: Pending jobs
SERVICE->>DB: Query contacts with filters
DB-->>SERVICE: Contact data
SERVICE->>SERVICE: Generate CSV
SERVICE->>S3: Upload CSV file
S3-->>SERVICE: File URL
SERVICE->>QUEUE: Update status<br/>(fileUrl, status: 'completed')
end

Key Features

  • Filter Support: Export by tags, segments, date ranges
  • Column Selection: Configurable CSV columns
  • Large Datasets: Streaming for memory efficiency
  • File Storage: S3/cloud storage integration
  • Download Links: Temporary signed URLs
  • Progress Tracking: Row count and status updates

Export Options

Filters:

  • Tags (include/exclude)
  • Date ranges (created, updated)
  • Contact status (active, unsubscribed)
  • Custom field values
  • Search queries

Format Options:

  • Column selection
  • Header inclusion
  • CSV delimiter (comma, semicolon, tab)
  • Encoding (UTF-8, Latin-1)

Code Pattern

Cron Initialization (crons/contacts/export.js):

const { exportContacts } = require('../../services/contacts/export');
const cron = require('node-cron');

let inProgress = false;
exports.start = async () => {
cron.schedule('*/30 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await exportContacts();
inProgress = false;
}
});
};

Service Logic (services/contacts/export.js):

  • Queries contact-queue for status: 'pending', type: 'export'
  • Applies filters from job data
  • Streams contacts to CSV
  • Uploads to S3 with expiration policy
  • Updates job with download URL

📙 Export Cleanup

Overview

Automatically deletes old export files to free storage space.

Cron Schedule: 0 4 * * * (Daily at 4:00 AM)

Source Files:

  • Cron: queue-manager/crons/contacts/cleanup.js
  • Service: queue-manager/services/contacts/cleanup.js

Processing Flow

sequenceDiagram
participant CRON as Cleanup Cron
participant SERVICE as Cleanup Service
participant QUEUE as contact-queue
participant S3 as File Storage

CRON->>SERVICE: Execute daily
SERVICE->>QUEUE: Query old export jobs
QUEUE-->>SERVICE: Jobs older than 7 days
SERVICE->>S3: Delete files
S3-->>SERVICE: Deleted
SERVICE->>QUEUE: Update status<br/>(fileDeleted: true)

Key Features

  • Automatic Cleanup: Runs daily
  • Configurable Retention: Default 7 days
  • Storage Optimization: Frees S3 storage
  • Audit Trail: Logs deleted files
  • Graceful Failure: Continues on individual file errors

Cleanup Policy

Files Deleted:

  • Export files older than retention period (default: 7 days)
  • Completed or failed export jobs
  • Files marked as expired

Files Preserved:

  • Recent exports (within retention period)
  • Export jobs still processing
  • Files marked as permanent

Code Pattern

Cron Initialization (crons/contacts/cleanup.js):

const { cleanupExports } = require('../../services/contacts/cleanup');
const cron = require('node-cron');

let inProgress = false;
exports.start = async () => {
cron.schedule('0 4 * * *', async () => {
if (!inProgress) {
inProgress = true;
await cleanupExports();
inProgress = false;
}
});
};

Service Logic (services/contacts/cleanup.js):

  • Queries contact-queue for exports older than 7 days
  • Filters completed/failed jobs with files
  • Deletes S3 objects
  • Updates job documents to mark files as deleted
  • Logs cleanup statistics


Module Status: Active
Execution Pattern: High-frequency polling + Daily cleanup
Last Updated: 2025-10-10

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM