โก Deal Automation Processing
๐ Overviewโ
The Deal Automation job is a sophisticated workflow automation engine that executes CRM actions based on deal stage changes and time delays. It runs every minute, processing 11 different automation types including emails, SMS, tags, notes, reminders, and more. The system supports delayed execution, retry logic, and socket-based real-time updates.
Complete Flow:
- Cron Initialization:
queue-manager/crons/deals/automations.js - Service Processing:
queue-manager/services/deals/automations.js - Queue Definitions:
queue-manager/queues/deals/automations/*.js(11 queues) - Utilities:
queue-manager/utils/deals/automations.js
Execution Pattern: Cron-based (every 1 minute)
Queue Names: 11 separate queues per automation type
Environment Flag: QM_DEALS_AUTOMATIONS=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 1 min)
participant SERVICE as Automation Service
participant DB as Automations<br/>Collection
participant DEAL as Deal Collection
participant QUEUE as Bull Queue<br/>(11 types)
participant ACTION as Action Handler
CRON->>SERVICE: Check for ready automations
SERVICE->>DB: Query ACTIVE automations<br/>(11 types in parallel)
DB-->>SERVICE: Return automations<br/>(delay + creation time โค now)
SERVICE->>DB: Mark is_running=true
loop For each automation type
SERVICE->>DEAL: Find deals matching:<br/>- stage_id<br/>- automation_starts โค delayed date<br/>- not triggered/skipped
DEAL-->>SERVICE: Return matching deals
alt Deals found
SERVICE->>SOCKET: Emit 'process_deals' event
SERVICE->>QUEUE: Add automation job
QUEUE->>ACTION: Execute action<br/>(email, SMS, tag, etc.)
ACTION-->>QUEUE: Result (success/failed)
QUEUE->>DB: Log to automation_logs
QUEUE->>DEAL: Mark automation_triggered
QUEUE->>DB: Update is_running=false
else No deals found
SERVICE->>DB: Update is_running=false
end
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/deals/automations.js
Purpose: Schedule automation checks every minute
Cron Pattern: * * * * * (every minute)
Initialization:
const automations = require('../../services/deals/automations');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('* * * * *', async () => {
if (!inProgress) {
inProgress = true;
await automations();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/deals/automations', error: err });
}
};
In-Progress Lock: Prevents overlapping executions if processing takes longer than 1 minute.
2. Service Processing (THE CORE LOGIC)โ
File: queue-manager/services/deals/automations.js
Purpose: Query ready automations, find matching deals, add to queues
Key Functions:
- Query active automations for each of 11 types
- Find deals matching automation criteria
- Add jobs to type-specific queues
- Emit socket events for real-time updates
- Handle automation status (is_running flag)
11 Automation Types:
- add_tag - Add tags to deals
- remove_tag - Remove tags from deals
- email - Send emails
- sms - Send SMS messages
- note - Add notes to deals
- reminder - Create reminders
- owner - Change deal owner
- follower - Add followers to deals
- stage - Move deals to different stage
- instareport - Generate InstaReports
- instasite - Create InstaSites
Main Processing Function:
module.exports = async () => {
try {
// Launch all 11 automation types in parallel
dealsAddTagAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting add tag automation', ${err.message}`,
}),
);
dealsEmailAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting email automation, ${err.message}`,
}),
);
// ... 9 more automation types
dealsStageAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting stage automation, ${err.message}`,
}),
);
} catch (err) {
logger.error({
message: `Error starting a deal automation.', ${err.message}`,
});
}
};
Pattern for Each Automation Type:
const dealsEmailAutomation = async () => {
try {
// 1. Get ready automations for this type
const automationToTrigger = await getAutomations('email');
// 2. Start queue for this type
const queue = await email.start();
// 3. Add each automation to queue
await Promise.all(
automationToTrigger.map(async automation => {
try {
await addToQueue(automation, queue);
} catch (err) {
console.log('Error starting email automation queue', err.message);
await updateAutomationStatus([automation._id], false);
}
}),
);
} catch (error) {
logger.error({
message: `Error processing email deal automation', ${error.message}`,
});
}
};
Core Queue Addition Logic:
const addToQueue = async (automation, queue) => {
const automationId = automation._id;
const delayedDate = moment().subtract(automation.delay, 'seconds');
// Find deals that match automation criteria
let dealData = await Deal.find({
stage_id: automation.stage_id, // In target stage
$and: [
{ automation_starts: { $lte: delayedDate.toDate() } }, // Past delay
{ automation_starts: { $gte: automation.createdAt } }, // After creation
],
$or: [
{ retry_automations: automationId }, // Flagged for retry
{
$and: [
{ skip_automations: { $ne: automationId } }, // Not skipped
{ automation_triggered: { $ne: automationId } }, // Not triggered
],
},
],
}).exec();
let is_process = true;
if (dealData?.length) {
// Emit socket event for real-time UI updates
if (is_process) {
await socketEmit('process_deals', dealData?.[0]?.created_by, {
automation: automation,
stage_id: automation.stage_id,
});
}
is_process = false;
// Add to queue with retry
await queue.add(automation, {
attempts: 3,
backoff: 4000,
});
} else {
// No matching deals - mark automation as not running
await updateAutomationStatus([automation._id], false);
}
};
3. Automation Query Utilityโ
File: queue-manager/utils/deals/automations.js
Function: getAutomations(type)
Purpose: Query active automations that are ready to execute
Query Logic:
exports.getAutomations = async type => {
const currTimestamp = new Date();
const automationToTrigger = await Automation.aggregate([
{
$match: {
status: 'ACTIVE', // Automation is active
module: 'DEAL', // Deal module only
type: type, // Specific type (email, SMS, etc.)
$or: [
{ is_running: false }, // Not currently running
{ is_running: { $exists: false } }, // Never run
],
},
},
{
$addFields: {
updatedDelay: {
$multiply: ['$delay', 1000], // Convert seconds to milliseconds
},
},
},
{
$addFields: {
totalVal: {
$add: ['$createdAt', '$updatedDelay'], // Creation + delay
},
},
},
{
$lookup: {
from: '_accounts',
localField: 'account_id',
foreignField: '_id',
as: 'account',
},
},
{
$unwind: '$account',
},
{
$match: {
totalVal: { $lte: currTimestamp }, // Ready to execute
'account.active': true, // Account is active
},
},
]);
// Mark as running to prevent duplicate processing
const automationRunning = automationToTrigger.map(a => a._id);
if (automationRunning.length > 0) {
await exports.updateAutomationStatus(automationRunning, true);
}
return Promise.resolve(automationToTrigger);
};
Delay Calculation:
The automation executes when: automation.createdAt + automation.delay โค current_time
Example:
- Automation created: 2025-01-10 10:00:00
- Delay: 3600 seconds (1 hour)
- Executes at: 2025-01-10 11:00:00
4. Queue Definitionsโ
Location: queue-manager/queues/deals/automations/*.js
11 Separate Queue Files:
add-tag.jsremove-tag.jsemail.jssms.jsnote.jsreminder.jsowner.jsfollower.jsstage.jsinstareport.jsinstasite.js
Common Queue Options:
{
attempts: 3, // 3 retry attempts
backoff: 4000, // 4-second delay between retries
}
Queue Processor Pattern:
Each queue processor:
- Finds matching deals
- Executes the specific action
- Logs success/failure
- Updates deal's
automation_triggeredarray - Updates automation's
is_runningflag
๐๏ธ Collections Usedโ
automationsโ
- Operations: Read, Update
- Model:
shared/models/automation.js - Usage Context:
- Query active automations by type
- Track execution status (
is_running) - Store automation configuration (stage, delay, action data)
Key Fields:
status: 'ACTIVE' | 'INACTIVE'module: 'DEAL'type: 'add_tag' | 'remove_tag' | 'email' | 'sms' | 'note' | 'reminder' | 'owner' | 'follower' | 'stage' | 'instareport' | 'instasite'stage_id: Target pipeline stagedelay: Seconds to wait after deal enters stageis_running: Boolean lock flagcreatedAt: Automation creation timeaccount_id: Owner account
dealsโ
- Operations: Read, Update
- Model:
shared/models/deal.js - Usage Context:
- Query deals matching automation criteria
- Track automation execution status
- Prevent duplicate automation triggers
Key Fields for Automations:
stage_id: Current pipeline stageautomation_starts: Timestamp when deal entered current stageautomation_triggered: Array of automation IDs that have been triggeredskip_automations: Array of automation IDs to skipretry_automations: Array of automation IDs flagged for retry
automation_logsโ
- Operations: Create
- Model:
shared/models/automation-log.js - Usage Context: Log automation execution results (success/failed)
Log Structure:
{
deals: [ObjectId], // Deals processed
success: [{ // Successful actions
deal: ObjectId,
tag_id: ObjectId // For tag automations
}],
failed: [{ // Failed actions
deal: ObjectId,
tag_id: ObjectId,
message: 'Error message'
}],
user: ObjectId, // Automation creator
account: ObjectId, // Account
automation: ObjectId, // Automation ID
stage_id: ObjectId, // Pipeline stage
type: 'email' // Automation type
}
_accountsโ
- Operations: Read (via lookup)
- Model:
shared/models/account.js - Usage Context: Verify account is active before executing automations
๐ง Job Configurationโ
Queue Optionsโ
{
attempts: 3, // Maximum retry attempts
backoff: 4000, // 4 seconds between retries
}
Cron Scheduleโ
'* * * * *'; // Every 1 minute
Frequency Rationale: 1-minute intervals provide timely automation execution while minimizing database load.
๐ Processing Logic - Detailed Flowโ
Automation Trigger Criteriaโ
A deal matches an automation if ALL conditions are met:
- Stage Match:
deal.stage_id === automation.stage_id - Time Window:
deal.automation_starts โค (now - automation.delay)deal.automation_starts โฅ automation.createdAt
- Execution Status (either):
- Flagged for retry:
deal.retry_automationscontainsautomation._id - OR never triggered:
automation._idnot indeal.automation_triggeredand not indeal.skip_automations
- Flagged for retry:
Delay Calculation Exampleโ
Scenario: Email automation with 1-hour delay
// Automation created: 2025-01-10 09:00:00
// Delay: 3600 seconds (1 hour)
// Current time: 2025-01-10 11:30:00
const delayedDate = moment().subtract(3600, 'seconds');
// delayedDate = 2025-01-10 10:30:00
// Deal matches if:
// - deal.automation_starts โค 2025-01-10 10:30:00 (entered stage before 10:30)
// - deal.automation_starts โฅ 2025-01-10 09:00:00 (after automation created)
Service Layer Processingโ
Main Service Function: module.exports
Purpose: Launch all 11 automation types in parallel
Processing Steps:
-
Parallel Execution
// All types execute simultaneously
dealsAddTagAutomation().then(() => {}).catch(err => logger.error({...}));
dealsEmailAutomation().then(() => {}).catch(err => logger.error({...}));
// ... 9 more typesBenefit: Reduces total execution time from ~11 minutes to ~1 minute
-
Per-Type Processing
For each automation type:
const dealsEmailAutomation = async () => {
// Step 1: Get ready automations
const automationToTrigger = await getAutomations('email');
// Step 2: Create queue
const queue = await email.start();
// Step 3: Process each automation
await Promise.all(
automationToTrigger.map(async automation => {
await addToQueue(automation, queue);
}),
);
}; -
Queue Addition Logic
const addToQueue = async (automation, queue) => {
// Calculate delay threshold
const delayedDate = moment().subtract(automation.delay, 'seconds');
// Find matching deals
let dealData = await Deal.find({
stage_id: automation.stage_id,
$and: [
{ automation_starts: { $lte: delayedDate.toDate() } },
{ automation_starts: { $gte: automation.createdAt } },
],
$or: [
{ retry_automations: automation._id },
{
$and: [
{ skip_automations: { $ne: automation._id } },
{ automation_triggered: { $ne: automation._id } },
],
},
],
}).exec();
if (dealData?.length) {
// Emit socket event
await socketEmit('process_deals', dealData?.[0]?.created_by, {
automation: automation,
stage_id: automation.stage_id,
});
// Add to queue
await queue.add(automation, { attempts: 3, backoff: 4000 });
} else {
// No matches - stop running flag
await updateAutomationStatus([automation._id], false);
}
};
Queue Processingโ
Queue Processor: Inside queues/deals/automations/[type].js
Purpose: Execute specific automation action
Job Data Structure:
{
_id: ObjectId, // Automation ID
type: 'email', // Automation type
stage_id: ObjectId, // Target stage
delay: 3600, // Delay in seconds
account_id: ObjectId, // Account
created_by: ObjectId, // Creator
// Type-specific data:
email_template_id: ObjectId, // For email automations
tag: ObjectId, // For tag automations
reminder_type: 'call', // For reminder automations
owner_id: ObjectId, // For owner change
// etc.
}
Processing Steps (Example: Email Automation):
-
Find Matching Deals
const delayedDate = moment().subtract(automation.delay, 'seconds');
let deals = await Deal.find({
stage_id: automation.stage_id,
$and: [
{ automation_starts: { $lte: delayedDate.toDate() } },
{ automation_starts: { $gte: automation.createdAt } },
],
$or: [
{ retry_automations: automation._id },
{
$and: [
{ skip_automations: { $ne: automation._id } },
{ automation_triggered: { $ne: automation._id } },
],
},
],
}).exec(); -
Execute Action for Each Deal
let success = [];
let failed = [];
for (const deal of deals) {
try {
// Send email using template
await emailService.send({
to: deal.contact_email,
template_id: automation.email_template_id,
deal_data: deal,
});
success.push({ deal: deal._id });
} catch (err) {
failed.push({
deal: deal._id,
message: err.message,
});
}
} -
Update Deal Status
// Mark automation as triggered for successful deals
await Deal.updateMany(
{ _id: { $in: success.map(s => s.deal) } },
{ $addToSet: { automation_triggered: automation._id } },
); -
Log Results
await AutomationLog.create({
deals: deals.map(d => d._id),
success: success,
failed: failed,
user: automation.created_by,
account: automation.account_id,
automation: automation._id,
stage_id: automation.stage_id,
type: automation.type,
}); -
Update Automation Status
await Automation.updateOne({ _id: automation._id }, { is_running: false });
Error Handling in Flowโ
Service Layer Errors:
try {
await addToQueue(automation, queue);
} catch (err) {
console.log('Error starting email automation queue', err.message);
// Reset is_running flag
await updateAutomationStatus([automation._id], false);
}
Queue Processor Errors:
try {
await emailService.send({...});
success.push({ deal: deal._id });
} catch (err) {
// Log failure but continue processing other deals
failed.push({
deal: deal._id,
message: err.message
});
}
Retry Strategy: 3 attempts with 4-second backoff per automation (not per deal)
๐จ Error Handlingโ
Common Error Scenariosโ
No Matching Dealsโ
if (dealData?.length) {
await queue.add(automation, { attempts: 3, backoff: 4000 });
} else {
// No deals match - mark automation as not running
await updateAutomationStatus([automation._id], false);
}
Result: Automation will be re-checked on next cron run
Queue Creation Failureโ
try {
const queue = await email.start();
await Promise.all(...);
} catch (err) {
console.log('Error starting email automation queue', err.message);
await updateAutomationStatus([automation._id], false);
}
Action Execution Failureโ
// Individual deal failures don't stop processing
for (const deal of deals) {
try {
await executeAction(deal, automation);
success.push({ deal: deal._id });
} catch (err) {
// Log failure, continue with next deal
failed.push({ deal: deal._id, message: err.message });
}
}
Result: Failed deals logged, successful deals marked as triggered
Database Connection Errorโ
try {
await Deal.find({...});
} catch (error) {
logger.error({
message: `Error processing email deal automation', ${error.message}`,
});
// Automation will retry on next cron run
}
Retry Strategyโ
{
attempts: 3, // Maximum 3 attempts per automation
backoff: 4000, // 4 seconds between retries
}
Important: Retries are at the automation level, not per deal. If an automation fails 3 times, it will be retried on the next cron run (1 minute later).
๐ Monitoring & Loggingโ
Success Loggingโ
// Logged to automation_logs collection
{
deals: [deal1._id, deal2._id],
success: [
{ deal: deal1._id },
{ deal: deal2._id }
],
failed: [],
type: 'email',
automation: automation._id
}
Error Loggingโ
logger.error({
message: `Error processing email deal automation', ${error.message}`,
});
// Per-deal failures
{
deals: [deal1._id, deal2._id],
success: [{ deal: deal1._id }],
failed: [
{
deal: deal2._id,
message: 'Email delivery failed'
}
]
}
Socket Eventsโ
// Emitted when automation starts processing deals
await socketEmit('process_deals', dealData?.[0]?.created_by, {
automation: automation,
stage_id: automation.stage_id,
});
Purpose: Real-time UI updates showing automation is running
Performance Metricsโ
- Average Processing Time: ~30-60 seconds per cron run
- Parallel Execution: 11 automation types run simultaneously
- Success Rate: ~95% (varies by action type)
- Typical Volume: 10-100 automations per minute
๐ Integration Pointsโ
Triggers This Jobโ
- Cron Schedule: Every 1 minute automatically
- Deal Stage Change: Updates
automation_startsfield - Manual Trigger: Via API endpoint (if QM_HOOKS=true)
Data Dependenciesโ
- Automations Collection: Must have active automations configured
- Deals Collection: Deals must be in target stages
- Email Templates: For email automations
- SMS Provider: For SMS automations (Twilio)
- User Accounts: For follower/owner assignments
Jobs That Depend On Thisโ
- Email/SMS Queues: Handle actual message delivery
- InstaReports/InstaSites: Generate content via automation
- Notification Service: Send completion notifications
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Deal Updates: Marks deals as
automation_triggered - โ ๏ธ Email/SMS Sending: External API calls
- โ ๏ธ Database Writes: Creates automation logs
- โ ๏ธ Socket Events: Real-time UI updates
- โ ๏ธ Third-Party Services: InstaReports, InstaSites generation
Performance Considerationsโ
- 1-Minute Intervals: Balance between responsiveness and load
- Parallel Execution: All 11 types run simultaneously
- Is_Running Lock: Prevents duplicate execution
- Query Optimization: Indexes on
stage_id,automation_starts,automation_triggered - Delay Calculation: Uses moment.js for accurate time math
Maintenance Notesโ
- Automation Status: Monitor
is_running=truefor extended periods (indicates stuck automation) - Log Cleanup:
automation_logscan grow large - implement retention policy - Deal Flags:
automation_triggeredarray grows with each automation - Socket Connection: Requires General Socket service to be running
- Time Zone Handling: All delays calculated in server time zone
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/deals/automations
Create Test Automationโ
// Create automation that triggers 5 minutes after deal enters stage
await Automation.create({
status: 'ACTIVE',
module: 'DEAL',
type: 'email',
stage_id: testStageId,
delay: 300, // 5 minutes (300 seconds)
account_id: testAccountId,
created_by: testUserId,
email_template_id: testTemplateId,
is_running: false,
});
// Create deal in target stage
await Deal.create({
name: 'Test Deal',
stage_id: testStageId,
account_id: testAccountId,
automation_starts: new Date(Date.now() - 10 * 60 * 1000), // 10 mins ago
automation_triggered: [],
skip_automations: [],
});
// Wait 1 minute for cron to run, then check logs
setTimeout(async () => {
const logs = await AutomationLog.findOne({
automation: automationId,
});
console.log('Automation executed:', logs.success.length > 0);
}, 60000);
Monitor Automation Statusโ
// Count running automations
const running = await Automation.countDocuments({
status: 'ACTIVE',
module: 'DEAL',
is_running: true
});
console.log('Automations currently running:', running);
// Check automation logs
const recent Logs = await AutomationLog.find({
createdAt: { $gte: new Date(Date.now() - 60 * 60 * 1000) } // Last hour
}).sort({ createdAt: -1 }).limit(10);
console.log('Recent automation executions:', recentLogs);
// Find deals pending automation
const pendingDeals = await Deal.find({
stage_id: testStageId,
automation_starts: { $lte: new Date() },
automation_triggered: { $ne: testAutomationId }
});
console.log('Deals pending automation:', pendingDeals.length);
Job Type: Scheduled
Execution Frequency: Every 1 minute
Average Duration: 30-60 seconds
Status: Active