Skip to main content

🎯 Inbound Leads Notifications

📖 Overview

The Inbound Leads module handles real-time notifications for new leads captured through various marketing channels including campaigns, forms, call tracking, and integrations.

Environment Flag: INBOUND_ENABLED=true

Trigger Types:

  • Change Stream on LeadsData collection (new lead inserts)

Notification Types: email, bell (FCM)
Location: notifications/services/inbound/

🏗️ Architecture

System Flow

graph TB
subgraph "Lead Sources"
CAMPAIGN[Marketing Campaigns]
FORM[Web Forms]
CALL[Call Tracking]
INTEGRATION[3rd Party Integrations]
end

subgraph "Processing"
STREAM[LeadsData Stream]
DETECT[Detect New Lead]
ENRICH[Enrich Lead Data]
ROUTE[Route to Owners]
CREATE[Create Notifications]
end

subgraph "Delivery"
QUEUE[NotificationQueue]
EMAIL[Email Notification]
BELL[Bell Notification]
end

CAMPAIGN --> STREAM
FORM --> STREAM
CALL --> STREAM
INTEGRATION --> STREAM

STREAM --> DETECT
DETECT --> ENRICH
ENRICH --> ROUTE
ROUTE --> CREATE
CREATE --> QUEUE
QUEUE --> EMAIL
QUEUE --> BELL

⚙️ Configuration

Environment Variables

# Module flag
INBOUND_ENABLED=true # Enable inbound lead notifications

# External dependencies (inherited)
SENDGRID_API_KEY=your_key
GENERAL_SOCKET=http://localhost:4000 # For bell notifications

Change Stream Configuration

const leadsDataStream = LeadsData.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.metadata.lead_received': true, // Only processed leads
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);

leadsDataStream.on('change', async data => {
await processNotification(data);
});

Stream Filters:

  • Only insert operations (new leads)
  • Only leads with metadata.lead_received: true (processed and validated)
  • Excludes duplicate/spam leads

📧 Notification Templates

1. New Lead Received

Trigger: New lead captured from any source
Type: email, bell
Recipients: Account owner, assigned campaign manager

Email Content Variables:

{
lead_name: "John Doe",
lead_email: "john.doe@example.com",
lead_phone: "+1 (555) 123-4567",
lead_company: "Acme Corp",
lead_source: "Google Ads Campaign",
campaign_name: "Q4 2025 Campaign",
form_name: "Contact Us",
landing_page: "https://example.com/contact",
lead_message: "Interested in your services...",
lead_date: "2025-10-13 14:30",
lead_score: 85,
lead_url: "https://app.dashclicks.com/leads/507f1f77bcf86cd799439011",
utm_source: "google",
utm_medium: "cpc",
utm_campaign: "q4-2025"
}

2. High-Value Lead Alert

Trigger: Lead with high score (>80) or specific criteria
Type: email (high priority), bell, SMS (optional)
Recipients: Account owner, sales team

Email Content Variables:

{
lead_name: "Enterprise Client Inc.",
lead_value: "$50,000",
lead_score: 95,
priority: "high",
lead_indicators: [
"High budget",
"Enterprise company",
"Immediate need"
],
lead_email: "contact@enterpriseclient.com",
lead_phone: "+1 (555) 987-6543",
lead_source: "Referral",
action_required: "Contact within 1 hour",
lead_url: "https://app.dashclicks.com/leads/507f1f77bcf86cd799439011"
}

3. Campaign Lead Summary (Batched)

Trigger: Multiple leads from same campaign (5+ leads/hour)
Type: email (digest)
Recipients: Campaign manager

Email Content Variables:

{
campaign_name: "Q4 2025 Campaign",
lead_count: 12,
time_period: "Last hour",
total_leads_today: 45,
leads: [
{
name: "John Doe",
email: "john.doe@example.com",
score: 75,
time: "14:30"
},
// ... more leads
],
campaign_url: "https://app.dashclicks.com/campaigns/507f1f77bcf86cd799439011",
performance_summary: {
conversion_rate: "3.2%",
cost_per_lead: "$45",
quality_score: "Good"
}
}

4. Call Tracking Lead

Trigger: Inbound call tracked as lead
Type: email, bell
Recipients: Account owner, assigned rep

Email Content Variables:

{
caller_name: "Jane Smith",
caller_phone: "+1 (555) 123-4567",
call_duration: "5:32",
call_recording_url: "https://recordings.dashclicks.com/call-12345.mp3",
tracking_number: "+1 (555) 000-1111",
campaign_name: "Local Service Campaign",
call_date: "2025-10-13 14:30",
call_transcript: "Hi, I'm interested in your services...",
lead_url: "https://app.dashclicks.com/leads/507f1f77bcf86cd799439011"
}

🔍 Processing Logic

Main Notification Processor

// From services/inbound/changeStream.js
async function processNotification(data) {
const lead = data.fullDocument;

// 1. Fetch full lead details with populated fields
const fullLead = await LeadsData.findById(lead._id)
.populate('account_id')
.populate('campaign_id')
.populate('assigned_to')
.lean();

if (!fullLead || !fullLead.account_id) {
logger.warn({
message: 'Invalid lead data',
lead_id: lead._id,
});
return;
}

// 2. Enrich lead with additional data
const enrichedLead = await enrichLeadData(fullLead);

// 3. Calculate lead score (if not already set)
if (!enrichedLead.lead_score) {
enrichedLead.lead_score = calculateLeadScore(enrichedLead);
}

// 4. Determine notification recipients
const recipients = await getLeadRecipients(enrichedLead);

if (recipients.length === 0) {
logger.info({
message: 'No recipients for lead notification',
lead_id: lead._id,
});
return;
}

// 5. Send notifications based on lead priority
if (enrichedLead.lead_score >= 80) {
// High-value lead - immediate notification
await sendHighValueLeadNotification(recipients, enrichedLead);
} else {
// Standard lead notification
await sendStandardLeadNotification(recipients, enrichedLead);
}

// 6. Check for campaign batching
await checkCampaignBatching(enrichedLead);
}

// Enrich lead with additional context
async function enrichLeadData(lead) {
const enriched = { ...lead };

// Add campaign details
if (lead.campaign_id) {
const campaign = await Campaign.findById(lead.campaign_id).lean();
enriched.campaign_name = campaign?.name;
enriched.campaign_type = campaign?.type;
}

// Add form details
if (lead.form_id) {
const form = await Form.findById(lead.form_id).lean();
enriched.form_name = form?.name;
}

// Add tracking details
if (lead.tracking_number_id) {
const tracking = await TrackingNumber.findById(lead.tracking_number_id).lean();
enriched.tracking_number = tracking?.phone_number;
enriched.call_recording_url = tracking?.recording_url;
}

// Parse UTM parameters
if (lead.utm_params) {
enriched.utm_source = lead.utm_params.utm_source;
enriched.utm_medium = lead.utm_params.utm_medium;
enriched.utm_campaign = lead.utm_params.utm_campaign;
}

return enriched;
}

// Calculate lead quality score
function calculateLeadScore(lead) {
let score = 50; // Base score

// Company provided (+10)
if (lead.company) score += 10;

// Phone provided (+15)
if (lead.phone) score += 15;

// Email provided (+10)
if (lead.email) score += 10;

// Message/inquiry provided (+5)
if (lead.message && lead.message.length > 20) score += 5;

// High-value keywords in message (+10)
if (lead.message) {
const highValueKeywords = ['budget', 'enterprise', 'urgent', 'immediately'];
const hasHighValueKeyword = highValueKeywords.some(keyword =>
lead.message.toLowerCase().includes(keyword),
);
if (hasHighValueKeyword) score += 10;
}

// Referral source (+20)
if (lead.source === 'referral') score += 20;

// Direct traffic (+5)
if (lead.utm_source === 'direct') score += 5;

return Math.min(score, 100); // Cap at 100
}

// Get notification recipients
async function getLeadRecipients(lead) {
const recipients = [];

// Add account owner
const account = await Account.findById(lead.account_id).populate('owner_id').lean();

if (account?.owner_id) {
recipients.push(account.owner_id);
}

// Add assigned user (if any)
if (lead.assigned_to) {
const assignedUser = await User.findById(lead.assigned_to).lean();
if (assignedUser) {
recipients.push(assignedUser);
}
}

// Add campaign manager (if campaign)
if (lead.campaign_id) {
const campaign = await Campaign.findById(lead.campaign_id).populate('manager_id').lean();

if (campaign?.manager_id) {
recipients.push(campaign.manager_id);
}
}

// Deduplicate recipients
return Array.from(new Map(recipients.map(r => [r._id.toString(), r])).values());
}

// Send high-value lead notification
async function sendHighValueLeadNotification(recipients, lead) {
for (const recipient of recipients) {
// Verify preferences
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: lead.account_id,
module: 'inbound',
type: 'high_value_lead',
subType: 'email',
});

if (!canNotify) continue;

// Email notification
await NotificationQueue.create({
type: 'email',
origin: 'inbound',
sender_account: lead.account_id,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id: 'd-high-value-lead-template',
additional_data: {
lead_name: lead.name,
lead_email: lead.email,
lead_phone: lead.phone,
lead_company: lead.company,
lead_score: lead.lead_score,
priority: 'high',
lead_source: lead.campaign_name || lead.source,
lead_message: lead.message,
lead_date: formatDateTime(lead.created_at),
lead_url: `https://app.dashclicks.com/leads/${lead._id}`,
action_required: 'Contact within 1 hour',
},
},
priority: 'high', // High priority delivery
check_credits: false,
});

// Bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'inbound',
sender_account: lead.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: '🔥 High-Value Lead!',
body: `${lead.name} - Score: ${lead.lead_score}`,
click_action: `https://app.dashclicks.com/leads/${lead._id}`,
icon: 'https://cdn.dashclicks.com/icons/hot-lead.png',
module: 'inbound',
type: 'high_value_lead',
data: {
subType: 'bell',
lead_id: lead._id.toString(),
lead_score: lead.lead_score,
},
},
});

// Optional SMS for high-value leads
const sendSMS = await checkSMSEnabled(recipient, lead.account_id);
if (sendSMS) {
await NotificationQueue.create({
type: 'sms',
origin: 'inbound',
sender_account: lead.account_id,
recipient: {
phone: recipient.phone,
},
content: {
message: `High-value lead received! ${lead.name} (Score: ${lead.lead_score}). View: ${lead._id}`,
},
check_credits: true, // SMS uses credits
});
}
}
}

// Send standard lead notification
async function sendStandardLeadNotification(recipients, lead) {
for (const recipient of recipients) {
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: lead.account_id,
module: 'inbound',
type: 'new_lead',
subType: 'email',
});

if (!canNotify) continue;

// Email notification
await NotificationQueue.create({
type: 'email',
origin: 'inbound',
sender_account: lead.account_id,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id: 'd-new-lead-template',
additional_data: {
lead_name: lead.name,
lead_email: lead.email,
lead_phone: lead.phone,
lead_company: lead.company,
lead_source: lead.campaign_name || lead.source,
campaign_name: lead.campaign_name,
form_name: lead.form_name,
landing_page: lead.landing_page,
lead_message: lead.message,
lead_date: formatDateTime(lead.created_at),
lead_score: lead.lead_score,
lead_url: `https://app.dashclicks.com/leads/${lead._id}`,
utm_source: lead.utm_source,
utm_medium: lead.utm_medium,
utm_campaign: lead.utm_campaign,
},
},
check_credits: false,
});

// Bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'inbound',
sender_account: lead.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: 'New Lead Received',
body: `${lead.name} - ${lead.campaign_name || lead.source}`,
click_action: `https://app.dashclicks.com/leads/${lead._id}`,
module: 'inbound',
type: 'new_lead',
data: {
subType: 'bell',
lead_id: lead._id.toString(),
},
},
});
}
}

🚨 Error Handling

Change Stream Management

// Resume token management
let RESUMETIME = Buffer.from(fs.readFileSync(__dirname + '/../../.starttime')).toString('utf-8');

if (RESUMETIME) {
RESUMETIME = parseInt(RESUMETIME);
RESUMETIME = new (require('mongodb').Timestamp)({ t: RESUMETIME, i: 1 });
}

// Graceful shutdown
process.on('SIGINT', async () => {
logger.log({
initiator: 'notifications/inbound',
message: 'Closing leads data stream',
});

await leadsDataStream.close();
process.exit(0);
});

// Stream error handling
leadsDataStream.on('error', error => {
logger.error({
initiator: 'notifications/inbound/stream',
message: 'Change stream error',
error: error,
});
});

💡 Examples

Example 1: Standard Lead

Trigger Event:

// LeadsData insert
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
campaign_id: ObjectId("507f1f77bcf86cd799439013"),
name: "John Doe",
email: "john.doe@example.com",
phone: "+1 (555) 123-4567",
company: "Acme Corp",
message: "Interested in your digital marketing services",
source: "web_form",
landing_page: "https://example.com/contact",
utm_params: {
utm_source: "google",
utm_medium: "cpc",
utm_campaign: "q4-2025"
},
metadata: {
lead_received: true
},
lead_score: 75,
created_at: new Date("2025-10-13T14:30:00Z")
}
}

Resulting Notifications:

  1. Email - New lead details sent to account owner and campaign manager
  2. Bell - Real-time notification "New Lead Received"

Example 2: High-Value Lead

Trigger Event:

// LeadsData insert with high score
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
name: "Enterprise Client Inc.",
email: "contact@enterpriseclient.com",
phone: "+1 (555) 987-6543",
company: "Enterprise Client Inc.",
message: "Looking for enterprise solution with $50,000 budget. Need immediately.",
source: "referral",
lead_score: 95,
metadata: {
lead_received: true
},
created_at: new Date("2025-10-13T14:30:00Z")
}
}

Resulting Notifications:

  1. Email (High Priority) - Urgent lead alert
  2. Bell - "🔥 High-Value Lead!" notification
  3. SMS (Optional) - Text alert for immediate action

📈 Metrics

Key Metrics:

  • Leads captured per day: ~500
  • Average lead score: 68
  • High-value leads (>80 score): ~15% of total
  • Notification delivery time: less than 30 seconds

Monitoring:

// Leads today
db.getCollection('leads.data').count({
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

// High-value leads
db.getCollection('leads.data').count({
lead_score: { $gte: 80 },
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

// Notifications sent
db.getCollection('notifications.queue').count({
origin: 'inbound',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

Module Type: Change Stream
Environment Flag: INBOUND_ENABLED
Dependencies: MongoDB (replica set), SendGrid, Firebase, Twilio (SMS optional)
Special Feature: Lead scoring and priority routing
Status: Active - critical for lead capture

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM