Skip to main content

โšก Deal Automation Processing

๐Ÿ“– Overviewโ€‹

The Deal Automation job is a sophisticated workflow automation engine that executes CRM actions based on deal stage changes and time delays. It runs every minute, processing 11 different automation types including emails, SMS, tags, notes, reminders, and more. The system supports delayed execution, retry logic, and socket-based real-time updates.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/deals/automations.js
  2. Service Processing: queue-manager/services/deals/automations.js
  3. Queue Definitions: queue-manager/queues/deals/automations/*.js (11 queues)
  4. Utilities: queue-manager/utils/deals/automations.js

Execution Pattern: Cron-based (every 1 minute)

Queue Names: 11 separate queues per automation type

Environment Flag: QM_DEALS_AUTOMATIONS=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 1 min)
participant SERVICE as Automation Service
participant DB as Automations<br/>Collection
participant DEAL as Deal Collection
participant QUEUE as Bull Queue<br/>(11 types)
participant ACTION as Action Handler

CRON->>SERVICE: Check for ready automations
SERVICE->>DB: Query ACTIVE automations<br/>(11 types in parallel)
DB-->>SERVICE: Return automations<br/>(delay + creation time โ‰ค now)

SERVICE->>DB: Mark is_running=true

loop For each automation type
SERVICE->>DEAL: Find deals matching:<br/>- stage_id<br/>- automation_starts โ‰ค delayed date<br/>- not triggered/skipped
DEAL-->>SERVICE: Return matching deals

alt Deals found
SERVICE->>SOCKET: Emit 'process_deals' event
SERVICE->>QUEUE: Add automation job
QUEUE->>ACTION: Execute action<br/>(email, SMS, tag, etc.)
ACTION-->>QUEUE: Result (success/failed)
QUEUE->>DB: Log to automation_logs
QUEUE->>DEAL: Mark automation_triggered
QUEUE->>DB: Update is_running=false
else No deals found
SERVICE->>DB: Update is_running=false
end
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/deals/automations.js

Purpose: Schedule automation checks every minute

Cron Pattern: * * * * * (every minute)

Initialization:

const automations = require('../../services/deals/automations');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('* * * * *', async () => {
if (!inProgress) {
inProgress = true;
await automations();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/deals/automations', error: err });
}
};

In-Progress Lock: Prevents overlapping executions if processing takes longer than 1 minute.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/deals/automations.js

Purpose: Query ready automations, find matching deals, add to queues

Key Functions:

  • Query active automations for each of 11 types
  • Find deals matching automation criteria
  • Add jobs to type-specific queues
  • Emit socket events for real-time updates
  • Handle automation status (is_running flag)

11 Automation Types:

  1. add_tag - Add tags to deals
  2. remove_tag - Remove tags from deals
  3. email - Send emails
  4. sms - Send SMS messages
  5. note - Add notes to deals
  6. reminder - Create reminders
  7. owner - Change deal owner
  8. follower - Add followers to deals
  9. stage - Move deals to different stage
  10. instareport - Generate InstaReports
  11. instasite - Create InstaSites

Main Processing Function:

module.exports = async () => {
try {
// Launch all 11 automation types in parallel
dealsAddTagAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting add tag automation', ${err.message}`,
}),
);
dealsEmailAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting email automation, ${err.message}`,
}),
);
// ... 9 more automation types
dealsStageAutomation()
.then(() => {})
.catch(err =>
logger.error({
message: `Error starting stage automation, ${err.message}`,
}),
);
} catch (err) {
logger.error({
message: `Error starting a deal automation.', ${err.message}`,
});
}
};

Pattern for Each Automation Type:

const dealsEmailAutomation = async () => {
try {
// 1. Get ready automations for this type
const automationToTrigger = await getAutomations('email');

// 2. Start queue for this type
const queue = await email.start();

// 3. Add each automation to queue
await Promise.all(
automationToTrigger.map(async automation => {
try {
await addToQueue(automation, queue);
} catch (err) {
console.log('Error starting email automation queue', err.message);
await updateAutomationStatus([automation._id], false);
}
}),
);
} catch (error) {
logger.error({
message: `Error processing email deal automation', ${error.message}`,
});
}
};

Core Queue Addition Logic:

const addToQueue = async (automation, queue) => {
const automationId = automation._id;
const delayedDate = moment().subtract(automation.delay, 'seconds');

// Find deals that match automation criteria
let dealData = await Deal.find({
stage_id: automation.stage_id, // In target stage
$and: [
{ automation_starts: { $lte: delayedDate.toDate() } }, // Past delay
{ automation_starts: { $gte: automation.createdAt } }, // After creation
],
$or: [
{ retry_automations: automationId }, // Flagged for retry
{
$and: [
{ skip_automations: { $ne: automationId } }, // Not skipped
{ automation_triggered: { $ne: automationId } }, // Not triggered
],
},
],
}).exec();

let is_process = true;
if (dealData?.length) {
// Emit socket event for real-time UI updates
if (is_process) {
await socketEmit('process_deals', dealData?.[0]?.created_by, {
automation: automation,
stage_id: automation.stage_id,
});
}
is_process = false;

// Add to queue with retry
await queue.add(automation, {
attempts: 3,
backoff: 4000,
});
} else {
// No matching deals - mark automation as not running
await updateAutomationStatus([automation._id], false);
}
};

3. Automation Query Utilityโ€‹

File: queue-manager/utils/deals/automations.js

Function: getAutomations(type)

Purpose: Query active automations that are ready to execute

Query Logic:

exports.getAutomations = async type => {
const currTimestamp = new Date();

const automationToTrigger = await Automation.aggregate([
{
$match: {
status: 'ACTIVE', // Automation is active
module: 'DEAL', // Deal module only
type: type, // Specific type (email, SMS, etc.)
$or: [
{ is_running: false }, // Not currently running
{ is_running: { $exists: false } }, // Never run
],
},
},
{
$addFields: {
updatedDelay: {
$multiply: ['$delay', 1000], // Convert seconds to milliseconds
},
},
},
{
$addFields: {
totalVal: {
$add: ['$createdAt', '$updatedDelay'], // Creation + delay
},
},
},
{
$lookup: {
from: '_accounts',
localField: 'account_id',
foreignField: '_id',
as: 'account',
},
},
{
$unwind: '$account',
},
{
$match: {
totalVal: { $lte: currTimestamp }, // Ready to execute
'account.active': true, // Account is active
},
},
]);

// Mark as running to prevent duplicate processing
const automationRunning = automationToTrigger.map(a => a._id);
if (automationRunning.length > 0) {
await exports.updateAutomationStatus(automationRunning, true);
}

return Promise.resolve(automationToTrigger);
};

Delay Calculation:

The automation executes when: automation.createdAt + automation.delay โ‰ค current_time

Example:

  • Automation created: 2025-01-10 10:00:00
  • Delay: 3600 seconds (1 hour)
  • Executes at: 2025-01-10 11:00:00

4. Queue Definitionsโ€‹

Location: queue-manager/queues/deals/automations/*.js

11 Separate Queue Files:

  • add-tag.js
  • remove-tag.js
  • email.js
  • sms.js
  • note.js
  • reminder.js
  • owner.js
  • follower.js
  • stage.js
  • instareport.js
  • instasite.js

Common Queue Options:

{
attempts: 3, // 3 retry attempts
backoff: 4000, // 4-second delay between retries
}

Queue Processor Pattern:

Each queue processor:

  1. Finds matching deals
  2. Executes the specific action
  3. Logs success/failure
  4. Updates deal's automation_triggered array
  5. Updates automation's is_running flag

๐Ÿ—„๏ธ Collections Usedโ€‹

automationsโ€‹

  • Operations: Read, Update
  • Model: shared/models/automation.js
  • Usage Context:
    • Query active automations by type
    • Track execution status (is_running)
    • Store automation configuration (stage, delay, action data)

Key Fields:

  • status: 'ACTIVE' | 'INACTIVE'
  • module: 'DEAL'
  • type: 'add_tag' | 'remove_tag' | 'email' | 'sms' | 'note' | 'reminder' | 'owner' | 'follower' | 'stage' | 'instareport' | 'instasite'
  • stage_id: Target pipeline stage
  • delay: Seconds to wait after deal enters stage
  • is_running: Boolean lock flag
  • createdAt: Automation creation time
  • account_id: Owner account

dealsโ€‹

  • Operations: Read, Update
  • Model: shared/models/deal.js
  • Usage Context:
    • Query deals matching automation criteria
    • Track automation execution status
    • Prevent duplicate automation triggers

Key Fields for Automations:

  • stage_id: Current pipeline stage
  • automation_starts: Timestamp when deal entered current stage
  • automation_triggered: Array of automation IDs that have been triggered
  • skip_automations: Array of automation IDs to skip
  • retry_automations: Array of automation IDs flagged for retry

automation_logsโ€‹

  • Operations: Create
  • Model: shared/models/automation-log.js
  • Usage Context: Log automation execution results (success/failed)

Log Structure:

{
deals: [ObjectId], // Deals processed
success: [{ // Successful actions
deal: ObjectId,
tag_id: ObjectId // For tag automations
}],
failed: [{ // Failed actions
deal: ObjectId,
tag_id: ObjectId,
message: 'Error message'
}],
user: ObjectId, // Automation creator
account: ObjectId, // Account
automation: ObjectId, // Automation ID
stage_id: ObjectId, // Pipeline stage
type: 'email' // Automation type
}

_accountsโ€‹

  • Operations: Read (via lookup)
  • Model: shared/models/account.js
  • Usage Context: Verify account is active before executing automations

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 3, // Maximum retry attempts
backoff: 4000, // 4 seconds between retries
}

Cron Scheduleโ€‹

'* * * * *'; // Every 1 minute

Frequency Rationale: 1-minute intervals provide timely automation execution while minimizing database load.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Automation Trigger Criteriaโ€‹

A deal matches an automation if ALL conditions are met:

  1. Stage Match: deal.stage_id === automation.stage_id
  2. Time Window:
    • deal.automation_starts โ‰ค (now - automation.delay)
    • deal.automation_starts โ‰ฅ automation.createdAt
  3. Execution Status (either):
    • Flagged for retry: deal.retry_automations contains automation._id
    • OR never triggered: automation._id not in deal.automation_triggered and not in deal.skip_automations

Delay Calculation Exampleโ€‹

Scenario: Email automation with 1-hour delay

// Automation created: 2025-01-10 09:00:00
// Delay: 3600 seconds (1 hour)
// Current time: 2025-01-10 11:30:00

const delayedDate = moment().subtract(3600, 'seconds');
// delayedDate = 2025-01-10 10:30:00

// Deal matches if:
// - deal.automation_starts โ‰ค 2025-01-10 10:30:00 (entered stage before 10:30)
// - deal.automation_starts โ‰ฅ 2025-01-10 09:00:00 (after automation created)

Service Layer Processingโ€‹

Main Service Function: module.exports

Purpose: Launch all 11 automation types in parallel

Processing Steps:

  1. Parallel Execution

    // All types execute simultaneously
    dealsAddTagAutomation().then(() => {}).catch(err => logger.error({...}));
    dealsEmailAutomation().then(() => {}).catch(err => logger.error({...}));
    // ... 9 more types

    Benefit: Reduces total execution time from ~11 minutes to ~1 minute

  2. Per-Type Processing

    For each automation type:

    const dealsEmailAutomation = async () => {
    // Step 1: Get ready automations
    const automationToTrigger = await getAutomations('email');

    // Step 2: Create queue
    const queue = await email.start();

    // Step 3: Process each automation
    await Promise.all(
    automationToTrigger.map(async automation => {
    await addToQueue(automation, queue);
    }),
    );
    };
  3. Queue Addition Logic

    const addToQueue = async (automation, queue) => {
    // Calculate delay threshold
    const delayedDate = moment().subtract(automation.delay, 'seconds');

    // Find matching deals
    let dealData = await Deal.find({
    stage_id: automation.stage_id,
    $and: [
    { automation_starts: { $lte: delayedDate.toDate() } },
    { automation_starts: { $gte: automation.createdAt } },
    ],
    $or: [
    { retry_automations: automation._id },
    {
    $and: [
    { skip_automations: { $ne: automation._id } },
    { automation_triggered: { $ne: automation._id } },
    ],
    },
    ],
    }).exec();

    if (dealData?.length) {
    // Emit socket event
    await socketEmit('process_deals', dealData?.[0]?.created_by, {
    automation: automation,
    stage_id: automation.stage_id,
    });

    // Add to queue
    await queue.add(automation, { attempts: 3, backoff: 4000 });
    } else {
    // No matches - stop running flag
    await updateAutomationStatus([automation._id], false);
    }
    };

Queue Processingโ€‹

Queue Processor: Inside queues/deals/automations/[type].js

Purpose: Execute specific automation action

Job Data Structure:

{
_id: ObjectId, // Automation ID
type: 'email', // Automation type
stage_id: ObjectId, // Target stage
delay: 3600, // Delay in seconds
account_id: ObjectId, // Account
created_by: ObjectId, // Creator
// Type-specific data:
email_template_id: ObjectId, // For email automations
tag: ObjectId, // For tag automations
reminder_type: 'call', // For reminder automations
owner_id: ObjectId, // For owner change
// etc.
}

Processing Steps (Example: Email Automation):

  1. Find Matching Deals

    const delayedDate = moment().subtract(automation.delay, 'seconds');

    let deals = await Deal.find({
    stage_id: automation.stage_id,
    $and: [
    { automation_starts: { $lte: delayedDate.toDate() } },
    { automation_starts: { $gte: automation.createdAt } },
    ],
    $or: [
    { retry_automations: automation._id },
    {
    $and: [
    { skip_automations: { $ne: automation._id } },
    { automation_triggered: { $ne: automation._id } },
    ],
    },
    ],
    }).exec();
  2. Execute Action for Each Deal

    let success = [];
    let failed = [];

    for (const deal of deals) {
    try {
    // Send email using template
    await emailService.send({
    to: deal.contact_email,
    template_id: automation.email_template_id,
    deal_data: deal,
    });

    success.push({ deal: deal._id });
    } catch (err) {
    failed.push({
    deal: deal._id,
    message: err.message,
    });
    }
    }
  3. Update Deal Status

    // Mark automation as triggered for successful deals
    await Deal.updateMany(
    { _id: { $in: success.map(s => s.deal) } },
    { $addToSet: { automation_triggered: automation._id } },
    );
  4. Log Results

    await AutomationLog.create({
    deals: deals.map(d => d._id),
    success: success,
    failed: failed,
    user: automation.created_by,
    account: automation.account_id,
    automation: automation._id,
    stage_id: automation.stage_id,
    type: automation.type,
    });
  5. Update Automation Status

    await Automation.updateOne({ _id: automation._id }, { is_running: false });

Error Handling in Flowโ€‹

Service Layer Errors:

try {
await addToQueue(automation, queue);
} catch (err) {
console.log('Error starting email automation queue', err.message);
// Reset is_running flag
await updateAutomationStatus([automation._id], false);
}

Queue Processor Errors:

try {
await emailService.send({...});
success.push({ deal: deal._id });
} catch (err) {
// Log failure but continue processing other deals
failed.push({
deal: deal._id,
message: err.message
});
}

Retry Strategy: 3 attempts with 4-second backoff per automation (not per deal)

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

No Matching Dealsโ€‹

if (dealData?.length) {
await queue.add(automation, { attempts: 3, backoff: 4000 });
} else {
// No deals match - mark automation as not running
await updateAutomationStatus([automation._id], false);
}

Result: Automation will be re-checked on next cron run

Queue Creation Failureโ€‹

try {
const queue = await email.start();
await Promise.all(...);
} catch (err) {
console.log('Error starting email automation queue', err.message);
await updateAutomationStatus([automation._id], false);
}

Action Execution Failureโ€‹

// Individual deal failures don't stop processing
for (const deal of deals) {
try {
await executeAction(deal, automation);
success.push({ deal: deal._id });
} catch (err) {
// Log failure, continue with next deal
failed.push({ deal: deal._id, message: err.message });
}
}

Result: Failed deals logged, successful deals marked as triggered

Database Connection Errorโ€‹

try {
await Deal.find({...});
} catch (error) {
logger.error({
message: `Error processing email deal automation', ${error.message}`,
});
// Automation will retry on next cron run
}

Retry Strategyโ€‹

{
attempts: 3, // Maximum 3 attempts per automation
backoff: 4000, // 4 seconds between retries
}

Important: Retries are at the automation level, not per deal. If an automation fails 3 times, it will be retried on the next cron run (1 minute later).

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

// Logged to automation_logs collection
{
deals: [deal1._id, deal2._id],
success: [
{ deal: deal1._id },
{ deal: deal2._id }
],
failed: [],
type: 'email',
automation: automation._id
}

Error Loggingโ€‹

logger.error({
message: `Error processing email deal automation', ${error.message}`,
});

// Per-deal failures
{
deals: [deal1._id, deal2._id],
success: [{ deal: deal1._id }],
failed: [
{
deal: deal2._id,
message: 'Email delivery failed'
}
]
}

Socket Eventsโ€‹

// Emitted when automation starts processing deals
await socketEmit('process_deals', dealData?.[0]?.created_by, {
automation: automation,
stage_id: automation.stage_id,
});

Purpose: Real-time UI updates showing automation is running

Performance Metricsโ€‹

  • Average Processing Time: ~30-60 seconds per cron run
  • Parallel Execution: 11 automation types run simultaneously
  • Success Rate: ~95% (varies by action type)
  • Typical Volume: 10-100 automations per minute

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 1 minute automatically
  • Deal Stage Change: Updates automation_starts field
  • Manual Trigger: Via API endpoint (if QM_HOOKS=true)

Data Dependenciesโ€‹

  • Automations Collection: Must have active automations configured
  • Deals Collection: Deals must be in target stages
  • Email Templates: For email automations
  • SMS Provider: For SMS automations (Twilio)
  • User Accounts: For follower/owner assignments

Jobs That Depend On Thisโ€‹

  • Email/SMS Queues: Handle actual message delivery
  • InstaReports/InstaSites: Generate content via automation
  • Notification Service: Send completion notifications

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Deal Updates: Marks deals as automation_triggered
  • โš ๏ธ Email/SMS Sending: External API calls
  • โš ๏ธ Database Writes: Creates automation logs
  • โš ๏ธ Socket Events: Real-time UI updates
  • โš ๏ธ Third-Party Services: InstaReports, InstaSites generation

Performance Considerationsโ€‹

  • 1-Minute Intervals: Balance between responsiveness and load
  • Parallel Execution: All 11 types run simultaneously
  • Is_Running Lock: Prevents duplicate execution
  • Query Optimization: Indexes on stage_id, automation_starts, automation_triggered
  • Delay Calculation: Uses moment.js for accurate time math

Maintenance Notesโ€‹

  • Automation Status: Monitor is_running=true for extended periods (indicates stuck automation)
  • Log Cleanup: automation_logs can grow large - implement retention policy
  • Deal Flags: automation_triggered array grows with each automation
  • Socket Connection: Requires General Socket service to be running
  • Time Zone Handling: All delays calculated in server time zone

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/deals/automations

Create Test Automationโ€‹

// Create automation that triggers 5 minutes after deal enters stage
await Automation.create({
status: 'ACTIVE',
module: 'DEAL',
type: 'email',
stage_id: testStageId,
delay: 300, // 5 minutes (300 seconds)
account_id: testAccountId,
created_by: testUserId,
email_template_id: testTemplateId,
is_running: false,
});

// Create deal in target stage
await Deal.create({
name: 'Test Deal',
stage_id: testStageId,
account_id: testAccountId,
automation_starts: new Date(Date.now() - 10 * 60 * 1000), // 10 mins ago
automation_triggered: [],
skip_automations: [],
});

// Wait 1 minute for cron to run, then check logs
setTimeout(async () => {
const logs = await AutomationLog.findOne({
automation: automationId,
});
console.log('Automation executed:', logs.success.length > 0);
}, 60000);

Monitor Automation Statusโ€‹

// Count running automations
const running = await Automation.countDocuments({
status: 'ACTIVE',
module: 'DEAL',
is_running: true
});

console.log('Automations currently running:', running);

// Check automation logs
const recent Logs = await AutomationLog.find({
createdAt: { $gte: new Date(Date.now() - 60 * 60 * 1000) } // Last hour
}).sort({ createdAt: -1 }).limit(10);

console.log('Recent automation executions:', recentLogs);

// Find deals pending automation
const pendingDeals = await Deal.find({
stage_id: testStageId,
automation_starts: { $lte: new Date() },
automation_triggered: { $ne: testAutomationId }
});

console.log('Deals pending automation:', pendingDeals.length);

Job Type: Scheduled
Execution Frequency: Every 1 minute
Average Duration: 30-60 seconds
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM