๐ Deal Import Processing
๐ Overviewโ
The Deal Import job processes CSV deal imports with custom field mappings, automatically creates pipeline stages, and handles data validation. It runs every 5 seconds, processing pending deal import requests from the queues collection.
Complete Flow:
- Cron Initialization:
queue-manager/crons/deals/import.js - Service Processing:
queue-manager/services/deals/import.js - Queue Definition:
queue-manager/queues/deals/import.js - Queue Collection:
queue-manager/models/queues.js
Execution Pattern: Cron-based (every 5 seconds)
Queue Name: deals_import_queue_[queueId]
Environment Flag: QM_DEALS=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Import Service
participant DB as Queue Collection
participant WASABI as Wasabi S3
participant QUEUE as Bull Queue
participant CRM as Deal Utility
CRON->>SERVICE: Check for pending imports
SERVICE->>DB: Query source='deals'<br/>status='pending'
DB-->>SERVICE: Return pending imports
SERVICE->>DB: Mark in_progress=true
loop For each import
SERVICE->>WASABI: Download CSV file
WASABI-->>SERVICE: CSV stream
SERVICE->>SERVICE: Parse CSV with csvtojson
SERVICE->>SERVICE: Generate JWT token (1h)
SERVICE->>QUEUE: Create queue for import
loop For each CSV row
SERVICE->>QUEUE: Add deal job with mappings
QUEUE->>QUEUE: Normalize stage_id, status, contract_type
QUEUE->>CRM: Create pipeline stage (if needed)
QUEUE->>CRM: Create deal with data
QUEUE->>DB: Log success/error
end
QUEUE->>DB: Update queue status<br/>(completed/failed)
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/deals/import.js
Purpose: Schedule import checks every 5 seconds
Cron Pattern: */5 * * * * * (every 5 seconds)
Initialization:
const importDeals = require('../../services/deals/import');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importDeals();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/deals/import', error: err });
}
};
In-Progress Lock: Prevents overlapping executions.
2. Service Processing (THE CORE LOGIC)โ
File: queue-manager/services/deals/import.js
Purpose: Query pending imports, download CSV, parse data, add to queues
Key Functions:
- Query pending deal imports from Queue collection
- Download CSV files from Wasabi S3
- Parse CSV using csvtojson
- Generate JWT authentication tokens
- Create import-specific queues
- Add each CSV row as a job
Processing Logic:
module.exports = async () => {
try {
// 1. Query pending imports
const deals = await Queue.find({
source: 'deals',
status: 'pending',
in_progress: false,
});
if (deals.length) {
// 2. Mark as in progress
const ids = deals.map(deal => deal._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
// 3. Process each import
await Promise.all(
deals.map(async deal => {
try {
const mappings = deal.mappings;
const user_id = deal.user_id;
const account_id = deal.account_id;
const parent_account = deal.parent_account;
const client_id = deal.client_id;
// 4. Get CSV key from S3
let key;
if (deal.csv[0].key) key = deal.csv[0].key;
else throw new Error(`Key is missing. Queue id: ${deal._id}`);
// 5. Download and parse CSV
const file = await new wasabiUtil().getReadStream(key);
const entries = await csv().fromStream(file);
// 6. Generate JWT token (1 hour expiration)
const pipeline_id = deal.additional_data.pipeline_id;
const token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
parent_account: parent_account.toString(),
client_id: client_id.toString(),
scope: 'contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);
// 7. Create user context
const user = {
created_by: user_id,
owner: user_id,
account_id: account_id,
token,
};
// 8. Create queue for this import
let Q = await dealQueue.start(deal._id.toString());
// 9. Add CSV rows to queue
await addData(entries, mappings, pipeline_id, user, deal, Q);
} catch (err) {
// Reset in_progress flag on error
await Queue.updateOne({ _id: deal._id }, { in_progress: false });
console.log(`Error: ${err}`);
}
}),
);
console.log('Deals added to queue for import.');
}
} catch (err) {
console.log(`Error: ${err}`);
}
};
Field Mapping Logic:
const addData = async (entries, mappings, pipeline_id, user, deal, queue) => {
let i = 1;
for (let entry of entries) {
let data = {};
// Apply mappings
for (let mapping in mappings) {
// Check if mapping is a custom field (MongoDB ObjectId)
if (verifyMongoId(mappings[mapping])) {
// Store in additional_info for custom fields
data['additional_info'] = {
...(data['additional_info'] || {}),
[mappings[mapping]]: entry[mapping] || '',
};
} else {
// Store as standard field
data[mappings[mapping]] = entry[mapping] || '';
}
}
data.source = 'csv';
data.tags = deal.additional_data?.tags;
// Add job to queue with retry
await queue.add(
{ pipeline_id, data, user, csvDeal: entry, queueItem: deal, lineNumber: i },
{
attempts: 3,
backoff: 4000,
},
);
i++;
}
};
MongoDB ObjectId Verification:
const verifyMongoId = id => {
if (mongoose.Types.ObjectId.isValid(id)) {
if (String(new mongoose.Types.ObjectId(id)) === id) return true;
return false;
}
return false;
};
3. Queue Definitionโ
File: queue-manager/queues/deals/import.js
Purpose: Bull queue configuration and deal creation with pipeline stages
Queue Options:
{
attempts: 3, // 3 retry attempts
backoff: 4000, // 4-second delay between retries
}
Job Processor: Creates pipeline stages and deals with field normalization
Processing Steps:
-
Normalize Field Values:
// Normalize stage_id
if (data?.stage_id) {
data.stage_id = data.stage_id.toLowerCase();
}
// Normalize status (replace spaces and dashes with underscores)
if (data?.status) {
data.status = data.status.replace(/ /g, '_');
data.status = data.status.replace('-', '_');
data.status = data.status.toLowerCase();
}
// Normalize contract_type
if (data?.contract_type) {
data.contract_type = data.contract_type.replace(/ /g, '_');
data.contract_type = data.contract_type.replace('-', '_');
data.contract_type = data.contract_type.toLowerCase();
} -
Create or Find Pipeline Stage:
let stageObject = {
name: data.stage_id,
account_id: user.account_id,
pipeline_id: pipeline_id.toString(),
user: user.owner,
};
let saveStage = await pipelinestages(stageObject);
data.stage_id = saveStage.id.toString(); -
Create Deal:
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
insertedDeals.push(deal); -
Log Results:
const { totalRecordsAdded, totalErrors, logId, logData, logs } = await logging(
{ errors: invalidDeals, savedContacts: insertedDeals },
totalRecordsAdded,
totalErrors,
logID,
[data],
user.owner,
user.user_id,
logs,
csvDeal,
'deal',
user.account_id,
); -
Update Queue Status on Completion:
const completedCb = async job => {
importParams.user_id = ownerId;
importParams._id = queue_item._id;
await processData({
resCSVImport,
importParams,
logs,
totalRecordsAdded,
});
};
๐๏ธ Collections Usedโ
queuesโ
- Operations: Read, Update
- Model:
queue-manager/models/queues.js - Usage Context:
- Query pending imports (
source: 'deals',status: 'pending') - Track import progress (
in_progress: true/false) - Store import configuration (mappings, pipeline_id, tags)
- Update final status on completion
- Query pending imports (
Key Fields:
source: 'deals'status: 'pending' | 'processing' | 'completed' | 'failed'in_progress: Boolean lock flaguser_id: User who initiated importaccount_id: Owner accountparent_account: Parent account for white-labelclient_id: Client identifiermappings: Object mapping CSV columns to deal fieldscsv: Array with S3 key for CSV fileadditional_data: Containspipeline_idandtags
dealsโ
- Operations: Create
- Model:
shared/models/deal.js - Usage Context: Create deal records from CSV data
pipeline_stagesโ
- Operations: Create, Read
- Model:
shared/models/pipeline-stage.js - Usage Context: Auto-create pipeline stages from CSV stage names
csv_importsโ
- Operations: Create, Update
- Model:
shared/models/csv-import.js - Usage Context: Log import results and errors
๐ง Job Configurationโ
Queue Optionsโ
{
attempts: 3, // Maximum retry attempts per deal
backoff: 4000, // 4 seconds between retries
}
Cron Scheduleโ
'*/5 * * * * *'; // Every 5 seconds
Frequency Rationale: 5-second intervals provide quick processing while preventing queue overflow.
JWT Token Expirationโ
{
expiresIn: '1h';
} // 1-hour token validity
Purpose: JWT tokens authenticate internal API calls for deal creation.
๐ Processing Logic - Detailed Flowโ
Service Layer Processingโ
Service Function: module.exports (anonymous async function)
Purpose: Query pending imports, download CSV, create jobs
Processing Steps:
-
Query Pending Imports
const deals = await Queue.find({
source: 'deals',
status: 'pending',
in_progress: false,
}); -
Mark as In Progress
const ids = deals.map(deal => deal._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true }); -
Download CSV from Wasabi S3
const key = deal.csv[0].key; // S3 key from queue item
const file = await new wasabiUtil().getReadStream(key); -
Parse CSV to JSON
const entries = await csv().fromStream(file);
// Returns array of objects: [{ column1: 'value', column2: 'value' }, ...] -
Generate JWT Token
const token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
scope: 'contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
); -
Create Import Queue
let Q = await dealQueue.start(deal._id.toString());
// Creates queue: deals_import_queue_[queueId] -
Add CSV Rows to Queue
await addData(entries, mappings, pipeline_id, user, deal, Q);
// Each row becomes a separate job
Queue Processingโ
Queue Processor: Inside queues/deals/import.js
Purpose: Process each CSV row, create pipeline stage and deal
Job Data Structure:
{
pipeline_id: ObjectId, // Target pipeline
data: { // Mapped deal data
name: 'Deal Name',
value: 5000,
stage_id: 'qualified',
status: 'open',
contract_type: 'monthly',
additional_info: { // Custom fields
customFieldId: 'value'
}
},
user: { // Authentication context
created_by: ObjectId,
owner: ObjectId,
account_id: ObjectId,
token: 'JWT token'
},
csvDeal: { ... }, // Original CSV row
queueItem: { ... }, // Queue collection document
lineNumber: 1 // CSV line number for errors
}
Processing Steps:
-
Extract Job Data
const { pipeline_id, data, user, csvDeal, queueItem, lineNumber } = job.data; -
Normalize Field Values
// Convert to lowercase and replace spaces/dashes
if (data?.stage_id) {
data.stage_id = data.stage_id.toLowerCase();
}
if (data?.status) {
data.status = data.status.replace(/ /g, '_').replace('-', '_').toLowerCase();
} -
Create or Find Pipeline Stage
let stageObject = {
name: data.stage_id,
account_id: user.account_id,
pipeline_id: pipeline_id.toString(),
user: user.owner,
};
let saveStage = await pipelinestages(stageObject);
data.stage_id = saveStage.id.toString(); -
Create Deal
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
insertedDeals.push(deal); -
Log Results
const { totalRecordsAdded, totalErrors, logId, logData, logs } = await logging(
{ errors: invalidDeals, savedContacts: insertedDeals },
totalRecordsAdded,
totalErrors,
logID,
[data],
user.owner,
user.user_id,
logs,
csvDeal,
'deal',
user.account_id,
); -
On All Jobs Completed
const completedCb = async job => {
importParams.user_id = ownerId;
importParams._id = queue_item._id;
await processData({
resCSVImport,
importParams,
logs,
totalRecordsAdded,
});
};
Field Mapping Logicโ
Standard Fields (mapped directly):
// Example: CSV column "Deal Name" โ deal.name
data[mappings[csvColumn]] = csvValue;
Custom Fields (stored in additional_info):
// Example: CSV column "Custom Field" โ deal.additional_info[fieldId]
if (verifyMongoId(mappings[csvColumn])) {
data['additional_info'] = {
...(data['additional_info'] || {}),
[mappings[csvColumn]]: csvValue,
};
}
Automatic Fields:
data.source = 'csv'; // Import source
data.tags = deal.additional_data?.tags; // Tags from import config
Error Handling in Flowโ
Service Layer Errors:
try {
// Download and process CSV
const file = await new wasabiUtil().getReadStream(key);
const entries = await csv().fromStream(file);
let Q = await dealQueue.start(deal._id.toString());
await addData(entries, mappings, pipeline_id, user, deal, Q);
} catch (err) {
// Reset in_progress flag to allow retry
await Queue.updateOne({ _id: deal._id }, { in_progress: false });
console.log(`Error: ${err}`);
}
Queue Processor Errors:
try {
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
insertedDeals.push(deal);
} catch (err) {
// Log error with line number context
err.additional_data = {
message: `Deal Insertion failed.`,
position: lineNumber,
additional_info: err.message,
};
invalidDeals.push(err.additional_data);
}
Retry Strategy: 3 attempts with 4-second backoff
๐จ Error Handlingโ
Common Error Scenariosโ
Missing CSV Keyโ
let key;
if (deal.csv[0].key) key = deal.csv[0].key;
else throw new Error(`Key is missing in the csv details. Queue id: ${deal._id}`);
Recovery: Error logged, import marked as not in_progress
S3 Download Failureโ
try {
const file = await new wasabiUtil().getReadStream(key);
} catch (error) {
console.log(`Error downloading CSV from S3: ${error.message}`);
// Import will retry on next cron run
}
CSV Parsing Errorโ
try {
const entries = await csv().fromStream(file);
} catch (error) {
console.log(`CSV parsing failed: ${error.message}`);
// Check CSV format and encoding
}
Deal Creation Failureโ
try {
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
} catch (err) {
// Logged with line number and original CSV data
err.additional_data = {
message: `Deal Insertion failed.`,
position: lineNumber,
additional_info: err.message,
};
invalidDeals.push(err.additional_data);
}
Result: Import continues processing remaining rows, errors logged in CSV import log
JWT Token Expirationโ
// Token valid for 1 hour
const token = jwt.sign({ ... }, process.env.APP_SECRET, { expiresIn: "1h" });
Prevention: Large imports should complete within 1 hour or implement token refresh
Retry Strategyโ
{
attempts: 3, // Maximum 3 attempts per deal
backoff: 4000, // 4 seconds between retries
}
Backoff Schedule:
- Attempt 1: Immediate
- Attempt 2: 4 seconds
- Attempt 3: 8 seconds (final attempt)
๐ Monitoring & Loggingโ
Success Loggingโ
console.log('Deals added to queue for import.');
console.log(`Processed ${totalRecordsAdded} deals successfully`);
Error Loggingโ
logger.error({
initiator: 'QM/deals/import',
error: err,
message: 'Deal Queue Job logging error',
});
Import Progress Trackingโ
// Logged per deal
{
totalRecordsAdded: 250, // Successfully imported
totalErrors: 5, // Failed validations
logs: [...], // Detailed error logs
lineNumber: 45 // CSV row number
}
Performance Metricsโ
- Average Processing Time: ~2-5 seconds per deal
- Success Rate: ~95%
- Typical Import Size: 10-500 deals
- Maximum Import Size: Limited by 1-hour JWT token expiration
๐ Integration Pointsโ
Triggers This Jobโ
- Internal API: POST
/api/v1/deals/importcreates queue item - Dashboard UI: Deal import wizard
- Cron Schedule: Every 5 seconds automatically
Data Dependenciesโ
- Queue Collection: Must have pending imports with CSV keys
- Wasabi S3: CSV files must be accessible
- Pipeline: Target pipeline must exist
- Custom Fields: Custom field IDs in mappings must be valid
Jobs That Depend On Thisโ
- Deal Automations: May trigger workflows after import
- Analytics: Import completion updates deal metrics
- Notifications: Import success/failure emails
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Deal Creation: Creates deal records in CRM
- โ ๏ธ Pipeline Stages: Auto-creates stages if they don't exist
- โ ๏ธ CSV Import Logs: Creates import log records
- โ ๏ธ JWT Generation: Creates temporary access tokens
- โ ๏ธ S3 Access: Downloads CSV files from Wasabi
Performance Considerationsโ
- 5-Second Intervals: Balance between responsiveness and server load
- In-Progress Lock: Prevents duplicate processing
- Per-Import Queues: Isolates imports for better tracking
- JWT Expiration: 1-hour limit means large imports may fail
- CSV Streaming: Memory-efficient parsing with csvtojson
Maintenance Notesโ
- Token Expiration: Monitor imports exceeding 1 hour
- S3 Key Cleanup: CSV files should be deleted after import completes
- Custom Field Validation: Verify MongoDB ObjectIds in mappings
- Stage Name Normalization: Lowercase conversion may cause duplicates
- Error Log Review: Check
csv_importscollection for failed rows
๐งช Testingโ
Manual Triggerโ
# Via Internal API
POST http://localhost:5002/api/v1/deals/import
Authorization: Bearer <token>
Content-Type: application/json
{
"pipeline_id": "507f1f77bcf86cd799439011",
"csv_file": "<file upload>",
"mappings": {
"Deal Name": "name",
"Value": "value",
"Stage": "stage_id",
"Status": "status"
},
"tags": ["imported", "q4-2025"]
}
Test Queue Itemโ
// Create test import queue item
await Queue.create({
source: 'deals',
status: 'pending',
in_progress: false,
user_id: testUserId,
account_id: testAccountId,
parent_account: testParentAccountId,
client_id: testClientId,
csv: [
{
key: 'imports/deals/test-import.csv',
name: 'test-import.csv',
},
],
mappings: {
'Deal Name': 'name',
Value: 'value',
Stage: 'stage_id',
},
additional_data: {
pipeline_id: testPipelineId,
tags: ['test'],
},
});
Monitor Import Progressโ
// Count pending imports
const pendingImports = await Queue.countDocuments({
source: 'deals',
status: 'pending',
in_progress: false,
});
console.log('Pending imports:', pendingImports);
// Check import status
const importStatus = await Queue.findById(importId);
console.log('Import status:', {
status: importStatus.status,
in_progress: importStatus.in_progress,
created_at: importStatus.created_at,
});
// View import logs
const importLogs = await CSVImport.findOne({
queue_id: importId,
});
console.log('Import results:', {
total_added: importLogs.total_records_added,
total_errors: importLogs.total_errors,
errors: importLogs.logs,
});
Sample CSV Formatโ
Deal Name,Value,Stage,Status,Contact Name,Contact Email
Acme Corp Deal,5000,qualified,open,John Doe,john@acme.com
TechStart Deal,12000,proposal,open,Jane Smith,jane@techstart.com
Global Inc Deal,25000,negotiation,open,Bob Johnson,bob@global.com
Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 2-5 seconds per deal
Status: Active