Skip to main content

๐Ÿ“Š Deal Import Processing

๐Ÿ“– Overviewโ€‹

The Deal Import job processes CSV deal imports with custom field mappings, automatically creates pipeline stages, and handles data validation. It runs every 5 seconds, processing pending deal import requests from the queues collection.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/deals/import.js
  2. Service Processing: queue-manager/services/deals/import.js
  3. Queue Definition: queue-manager/queues/deals/import.js
  4. Queue Collection: queue-manager/models/queues.js

Execution Pattern: Cron-based (every 5 seconds)

Queue Name: deals_import_queue_[queueId]

Environment Flag: QM_DEALS=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Import Service
participant DB as Queue Collection
participant WASABI as Wasabi S3
participant QUEUE as Bull Queue
participant CRM as Deal Utility

CRON->>SERVICE: Check for pending imports
SERVICE->>DB: Query source='deals'<br/>status='pending'
DB-->>SERVICE: Return pending imports

SERVICE->>DB: Mark in_progress=true

loop For each import
SERVICE->>WASABI: Download CSV file
WASABI-->>SERVICE: CSV stream
SERVICE->>SERVICE: Parse CSV with csvtojson
SERVICE->>SERVICE: Generate JWT token (1h)
SERVICE->>QUEUE: Create queue for import

loop For each CSV row
SERVICE->>QUEUE: Add deal job with mappings
QUEUE->>QUEUE: Normalize stage_id, status, contract_type
QUEUE->>CRM: Create pipeline stage (if needed)
QUEUE->>CRM: Create deal with data
QUEUE->>DB: Log success/error
end

QUEUE->>DB: Update queue status<br/>(completed/failed)
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/deals/import.js

Purpose: Schedule import checks every 5 seconds

Cron Pattern: */5 * * * * * (every 5 seconds)

Initialization:

const importDeals = require('../../services/deals/import');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await importDeals();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/deals/import', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/deals/import.js

Purpose: Query pending imports, download CSV, parse data, add to queues

Key Functions:

  • Query pending deal imports from Queue collection
  • Download CSV files from Wasabi S3
  • Parse CSV using csvtojson
  • Generate JWT authentication tokens
  • Create import-specific queues
  • Add each CSV row as a job

Processing Logic:

module.exports = async () => {
try {
// 1. Query pending imports
const deals = await Queue.find({
source: 'deals',
status: 'pending',
in_progress: false,
});

if (deals.length) {
// 2. Mark as in progress
const ids = deals.map(deal => deal._id);
await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });

// 3. Process each import
await Promise.all(
deals.map(async deal => {
try {
const mappings = deal.mappings;
const user_id = deal.user_id;
const account_id = deal.account_id;
const parent_account = deal.parent_account;
const client_id = deal.client_id;

// 4. Get CSV key from S3
let key;
if (deal.csv[0].key) key = deal.csv[0].key;
else throw new Error(`Key is missing. Queue id: ${deal._id}`);

// 5. Download and parse CSV
const file = await new wasabiUtil().getReadStream(key);
const entries = await csv().fromStream(file);

// 6. Generate JWT token (1 hour expiration)
const pipeline_id = deal.additional_data.pipeline_id;
const token = jwt.sign(
{
type: 'access_token',
uid: user_id.toString(),
account_id: account_id.toString(),
parent_account: parent_account.toString(),
client_id: client_id.toString(),
scope: 'contacts communications contacts.external contacts.read contacts.create',
},
process.env.APP_SECRET,
{ expiresIn: '1h' },
);

// 7. Create user context
const user = {
created_by: user_id,
owner: user_id,
account_id: account_id,
token,
};

// 8. Create queue for this import
let Q = await dealQueue.start(deal._id.toString());

// 9. Add CSV rows to queue
await addData(entries, mappings, pipeline_id, user, deal, Q);
} catch (err) {
// Reset in_progress flag on error
await Queue.updateOne({ _id: deal._id }, { in_progress: false });
console.log(`Error: ${err}`);
}
}),
);

console.log('Deals added to queue for import.');
}
} catch (err) {
console.log(`Error: ${err}`);
}
};

Field Mapping Logic:

const addData = async (entries, mappings, pipeline_id, user, deal, queue) => {
let i = 1;
for (let entry of entries) {
let data = {};

// Apply mappings
for (let mapping in mappings) {
// Check if mapping is a custom field (MongoDB ObjectId)
if (verifyMongoId(mappings[mapping])) {
// Store in additional_info for custom fields
data['additional_info'] = {
...(data['additional_info'] || {}),
[mappings[mapping]]: entry[mapping] || '',
};
} else {
// Store as standard field
data[mappings[mapping]] = entry[mapping] || '';
}
}

data.source = 'csv';
data.tags = deal.additional_data?.tags;

// Add job to queue with retry
await queue.add(
{ pipeline_id, data, user, csvDeal: entry, queueItem: deal, lineNumber: i },
{
attempts: 3,
backoff: 4000,
},
);
i++;
}
};

MongoDB ObjectId Verification:

const verifyMongoId = id => {
if (mongoose.Types.ObjectId.isValid(id)) {
if (String(new mongoose.Types.ObjectId(id)) === id) return true;
return false;
}
return false;
};

3. Queue Definitionโ€‹

File: queue-manager/queues/deals/import.js

Purpose: Bull queue configuration and deal creation with pipeline stages

Queue Options:

{
attempts: 3, // 3 retry attempts
backoff: 4000, // 4-second delay between retries
}

Job Processor: Creates pipeline stages and deals with field normalization

Processing Steps:

  1. Normalize Field Values:

    // Normalize stage_id
    if (data?.stage_id) {
    data.stage_id = data.stage_id.toLowerCase();
    }

    // Normalize status (replace spaces and dashes with underscores)
    if (data?.status) {
    data.status = data.status.replace(/ /g, '_');
    data.status = data.status.replace('-', '_');
    data.status = data.status.toLowerCase();
    }

    // Normalize contract_type
    if (data?.contract_type) {
    data.contract_type = data.contract_type.replace(/ /g, '_');
    data.contract_type = data.contract_type.replace('-', '_');
    data.contract_type = data.contract_type.toLowerCase();
    }
  2. Create or Find Pipeline Stage:

    let stageObject = {
    name: data.stage_id,
    account_id: user.account_id,
    pipeline_id: pipeline_id.toString(),
    user: user.owner,
    };
    let saveStage = await pipelinestages(stageObject);
    data.stage_id = saveStage.id.toString();
  3. Create Deal:

    const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
    insertedDeals.push(deal);
  4. Log Results:

    const { totalRecordsAdded, totalErrors, logId, logData, logs } = await logging(
    { errors: invalidDeals, savedContacts: insertedDeals },
    totalRecordsAdded,
    totalErrors,
    logID,
    [data],
    user.owner,
    user.user_id,
    logs,
    csvDeal,
    'deal',
    user.account_id,
    );
  5. Update Queue Status on Completion:

    const completedCb = async job => {
    importParams.user_id = ownerId;
    importParams._id = queue_item._id;
    await processData({
    resCSVImport,
    importParams,
    logs,
    totalRecordsAdded,
    });
    };

๐Ÿ—„๏ธ Collections Usedโ€‹

queuesโ€‹

  • Operations: Read, Update
  • Model: queue-manager/models/queues.js
  • Usage Context:
    • Query pending imports (source: 'deals', status: 'pending')
    • Track import progress (in_progress: true/false)
    • Store import configuration (mappings, pipeline_id, tags)
    • Update final status on completion

Key Fields:

  • source: 'deals'
  • status: 'pending' | 'processing' | 'completed' | 'failed'
  • in_progress: Boolean lock flag
  • user_id: User who initiated import
  • account_id: Owner account
  • parent_account: Parent account for white-label
  • client_id: Client identifier
  • mappings: Object mapping CSV columns to deal fields
  • csv: Array with S3 key for CSV file
  • additional_data: Contains pipeline_id and tags

dealsโ€‹

  • Operations: Create
  • Model: shared/models/deal.js
  • Usage Context: Create deal records from CSV data

pipeline_stagesโ€‹

  • Operations: Create, Read
  • Model: shared/models/pipeline-stage.js
  • Usage Context: Auto-create pipeline stages from CSV stage names

csv_importsโ€‹

  • Operations: Create, Update
  • Model: shared/models/csv-import.js
  • Usage Context: Log import results and errors

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 3, // Maximum retry attempts per deal
backoff: 4000, // 4 seconds between retries
}

Cron Scheduleโ€‹

'*/5 * * * * *'; // Every 5 seconds

Frequency Rationale: 5-second intervals provide quick processing while preventing queue overflow.

JWT Token Expirationโ€‹

{
expiresIn: '1h';
} // 1-hour token validity

Purpose: JWT tokens authenticate internal API calls for deal creation.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Service Layer Processingโ€‹

Service Function: module.exports (anonymous async function)

Purpose: Query pending imports, download CSV, create jobs

Processing Steps:

  1. Query Pending Imports

    const deals = await Queue.find({
    source: 'deals',
    status: 'pending',
    in_progress: false,
    });
  2. Mark as In Progress

    const ids = deals.map(deal => deal._id);
    await Queue.updateMany({ _id: { $in: ids } }, { in_progress: true });
  3. Download CSV from Wasabi S3

    const key = deal.csv[0].key; // S3 key from queue item
    const file = await new wasabiUtil().getReadStream(key);
  4. Parse CSV to JSON

    const entries = await csv().fromStream(file);
    // Returns array of objects: [{ column1: 'value', column2: 'value' }, ...]
  5. Generate JWT Token

    const token = jwt.sign(
    {
    type: 'access_token',
    uid: user_id.toString(),
    account_id: account_id.toString(),
    scope: 'contacts communications contacts.external contacts.read contacts.create',
    },
    process.env.APP_SECRET,
    { expiresIn: '1h' },
    );
  6. Create Import Queue

    let Q = await dealQueue.start(deal._id.toString());
    // Creates queue: deals_import_queue_[queueId]
  7. Add CSV Rows to Queue

    await addData(entries, mappings, pipeline_id, user, deal, Q);
    // Each row becomes a separate job

Queue Processingโ€‹

Queue Processor: Inside queues/deals/import.js

Purpose: Process each CSV row, create pipeline stage and deal

Job Data Structure:

{
pipeline_id: ObjectId, // Target pipeline
data: { // Mapped deal data
name: 'Deal Name',
value: 5000,
stage_id: 'qualified',
status: 'open',
contract_type: 'monthly',
additional_info: { // Custom fields
customFieldId: 'value'
}
},
user: { // Authentication context
created_by: ObjectId,
owner: ObjectId,
account_id: ObjectId,
token: 'JWT token'
},
csvDeal: { ... }, // Original CSV row
queueItem: { ... }, // Queue collection document
lineNumber: 1 // CSV line number for errors
}

Processing Steps:

  1. Extract Job Data

    const { pipeline_id, data, user, csvDeal, queueItem, lineNumber } = job.data;
  2. Normalize Field Values

    // Convert to lowercase and replace spaces/dashes
    if (data?.stage_id) {
    data.stage_id = data.stage_id.toLowerCase();
    }

    if (data?.status) {
    data.status = data.status.replace(/ /g, '_').replace('-', '_').toLowerCase();
    }
  3. Create or Find Pipeline Stage

    let stageObject = {
    name: data.stage_id,
    account_id: user.account_id,
    pipeline_id: pipeline_id.toString(),
    user: user.owner,
    };
    let saveStage = await pipelinestages(stageObject);
    data.stage_id = saveStage.id.toString();
  4. Create Deal

    const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
    insertedDeals.push(deal);
  5. Log Results

    const { totalRecordsAdded, totalErrors, logId, logData, logs } = await logging(
    { errors: invalidDeals, savedContacts: insertedDeals },
    totalRecordsAdded,
    totalErrors,
    logID,
    [data],
    user.owner,
    user.user_id,
    logs,
    csvDeal,
    'deal',
    user.account_id,
    );
  6. On All Jobs Completed

    const completedCb = async job => {
    importParams.user_id = ownerId;
    importParams._id = queue_item._id;
    await processData({
    resCSVImport,
    importParams,
    logs,
    totalRecordsAdded,
    });
    };

Field Mapping Logicโ€‹

Standard Fields (mapped directly):

// Example: CSV column "Deal Name" โ†’ deal.name
data[mappings[csvColumn]] = csvValue;

Custom Fields (stored in additional_info):

// Example: CSV column "Custom Field" โ†’ deal.additional_info[fieldId]
if (verifyMongoId(mappings[csvColumn])) {
data['additional_info'] = {
...(data['additional_info'] || {}),
[mappings[csvColumn]]: csvValue,
};
}

Automatic Fields:

data.source = 'csv'; // Import source
data.tags = deal.additional_data?.tags; // Tags from import config

Error Handling in Flowโ€‹

Service Layer Errors:

try {
// Download and process CSV
const file = await new wasabiUtil().getReadStream(key);
const entries = await csv().fromStream(file);
let Q = await dealQueue.start(deal._id.toString());
await addData(entries, mappings, pipeline_id, user, deal, Q);
} catch (err) {
// Reset in_progress flag to allow retry
await Queue.updateOne({ _id: deal._id }, { in_progress: false });
console.log(`Error: ${err}`);
}

Queue Processor Errors:

try {
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
insertedDeals.push(deal);
} catch (err) {
// Log error with line number context
err.additional_data = {
message: `Deal Insertion failed.`,
position: lineNumber,
additional_info: err.message,
};
invalidDeals.push(err.additional_data);
}

Retry Strategy: 3 attempts with 4-second backoff

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Missing CSV Keyโ€‹

let key;
if (deal.csv[0].key) key = deal.csv[0].key;
else throw new Error(`Key is missing in the csv details. Queue id: ${deal._id}`);

Recovery: Error logged, import marked as not in_progress

S3 Download Failureโ€‹

try {
const file = await new wasabiUtil().getReadStream(key);
} catch (error) {
console.log(`Error downloading CSV from S3: ${error.message}`);
// Import will retry on next cron run
}

CSV Parsing Errorโ€‹

try {
const entries = await csv().fromStream(file);
} catch (error) {
console.log(`CSV parsing failed: ${error.message}`);
// Check CSV format and encoding
}

Deal Creation Failureโ€‹

try {
const deal = await Deal(pipeline_id, user.account_id, user.owner, data);
} catch (err) {
// Logged with line number and original CSV data
err.additional_data = {
message: `Deal Insertion failed.`,
position: lineNumber,
additional_info: err.message,
};
invalidDeals.push(err.additional_data);
}

Result: Import continues processing remaining rows, errors logged in CSV import log

JWT Token Expirationโ€‹

// Token valid for 1 hour
const token = jwt.sign({ ... }, process.env.APP_SECRET, { expiresIn: "1h" });

Prevention: Large imports should complete within 1 hour or implement token refresh

Retry Strategyโ€‹

{
attempts: 3, // Maximum 3 attempts per deal
backoff: 4000, // 4 seconds between retries
}

Backoff Schedule:

  • Attempt 1: Immediate
  • Attempt 2: 4 seconds
  • Attempt 3: 8 seconds (final attempt)

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

console.log('Deals added to queue for import.');
console.log(`Processed ${totalRecordsAdded} deals successfully`);

Error Loggingโ€‹

logger.error({
initiator: 'QM/deals/import',
error: err,
message: 'Deal Queue Job logging error',
});

Import Progress Trackingโ€‹

// Logged per deal
{
totalRecordsAdded: 250, // Successfully imported
totalErrors: 5, // Failed validations
logs: [...], // Detailed error logs
lineNumber: 45 // CSV row number
}

Performance Metricsโ€‹

  • Average Processing Time: ~2-5 seconds per deal
  • Success Rate: ~95%
  • Typical Import Size: 10-500 deals
  • Maximum Import Size: Limited by 1-hour JWT token expiration

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Internal API: POST /api/v1/deals/import creates queue item
  • Dashboard UI: Deal import wizard
  • Cron Schedule: Every 5 seconds automatically

Data Dependenciesโ€‹

  • Queue Collection: Must have pending imports with CSV keys
  • Wasabi S3: CSV files must be accessible
  • Pipeline: Target pipeline must exist
  • Custom Fields: Custom field IDs in mappings must be valid

Jobs That Depend On Thisโ€‹

  • Deal Automations: May trigger workflows after import
  • Analytics: Import completion updates deal metrics
  • Notifications: Import success/failure emails

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Deal Creation: Creates deal records in CRM
  • โš ๏ธ Pipeline Stages: Auto-creates stages if they don't exist
  • โš ๏ธ CSV Import Logs: Creates import log records
  • โš ๏ธ JWT Generation: Creates temporary access tokens
  • โš ๏ธ S3 Access: Downloads CSV files from Wasabi

Performance Considerationsโ€‹

  • 5-Second Intervals: Balance between responsiveness and server load
  • In-Progress Lock: Prevents duplicate processing
  • Per-Import Queues: Isolates imports for better tracking
  • JWT Expiration: 1-hour limit means large imports may fail
  • CSV Streaming: Memory-efficient parsing with csvtojson

Maintenance Notesโ€‹

  • Token Expiration: Monitor imports exceeding 1 hour
  • S3 Key Cleanup: CSV files should be deleted after import completes
  • Custom Field Validation: Verify MongoDB ObjectIds in mappings
  • Stage Name Normalization: Lowercase conversion may cause duplicates
  • Error Log Review: Check csv_imports collection for failed rows

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via Internal API
POST http://localhost:5002/api/v1/deals/import
Authorization: Bearer <token>
Content-Type: application/json

{
"pipeline_id": "507f1f77bcf86cd799439011",
"csv_file": "<file upload>",
"mappings": {
"Deal Name": "name",
"Value": "value",
"Stage": "stage_id",
"Status": "status"
},
"tags": ["imported", "q4-2025"]
}

Test Queue Itemโ€‹

// Create test import queue item
await Queue.create({
source: 'deals',
status: 'pending',
in_progress: false,
user_id: testUserId,
account_id: testAccountId,
parent_account: testParentAccountId,
client_id: testClientId,
csv: [
{
key: 'imports/deals/test-import.csv',
name: 'test-import.csv',
},
],
mappings: {
'Deal Name': 'name',
Value: 'value',
Stage: 'stage_id',
},
additional_data: {
pipeline_id: testPipelineId,
tags: ['test'],
},
});

Monitor Import Progressโ€‹

// Count pending imports
const pendingImports = await Queue.countDocuments({
source: 'deals',
status: 'pending',
in_progress: false,
});

console.log('Pending imports:', pendingImports);

// Check import status
const importStatus = await Queue.findById(importId);
console.log('Import status:', {
status: importStatus.status,
in_progress: importStatus.in_progress,
created_at: importStatus.created_at,
});

// View import logs
const importLogs = await CSVImport.findOne({
queue_id: importId,
});
console.log('Import results:', {
total_added: importLogs.total_records_added,
total_errors: importLogs.total_errors,
errors: importLogs.logs,
});

Sample CSV Formatโ€‹

Deal Name,Value,Stage,Status,Contact Name,Contact Email
Acme Corp Deal,5000,qualified,open,John Doe,john@acme.com
TechStart Deal,12000,proposal,open,Jane Smith,jane@techstart.com
Global Inc Deal,25000,negotiation,open,Bob Johnson,bob@global.com

Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 2-5 seconds per deal
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM