Skip to main content

👥 Contact Notifications

📖 Overview

The Contact Notifications module handles notifications related to CRM contact management including contact assignments and bulk import completions.

Environment Flag: CONTACTS_ENABLED=true

Trigger Types:

  • Change Stream on Contact collection (owner/assignee updates)
  • Change Stream on ContactQueue collection (import completions)

Notification Types: bell (FCM), browser push
Location: notifications/services/contacts/

🏗️ Architecture

System Flow

graph TB
subgraph "Triggers"
ASSIGN[Contact Update<br/>Assignee Change]
IMPORT[ContactQueue Insert<br/>Import Complete]
end

subgraph "Processing"
DETECT1[Detect Owner Change]
DETECT2[Detect Import Complete]
VERIFY[Verify User Preferences]
CREATE[Create Notifications]
end

subgraph "Delivery"
BELL[Bell Notification]
BROWSER[Browser Push]
end

ASSIGN --> DETECT1
IMPORT --> DETECT2
DETECT1 --> VERIFY
DETECT2 --> VERIFY
VERIFY --> CREATE
CREATE --> BELL
CREATE --> BROWSER

⚙️ Configuration

Environment Variables

# Module flag
CONTACTS_ENABLED=true # Enable contact notifications

# External dependencies (inherited)
GENERAL_SOCKET=http://localhost:4000 # For bell notifications
FIREBASE_CREDENTIALS=... # For browser push

Change Stream Configuration

Assignee Change Stream

const contactStream = Contact.watch(
[
{
$match: {
operationType: 'update',
},
},
],
{
fullDocument: 'updateLookup',
},
);

contactStream.on('change', async data => {
// Only process if owner field was updated
if (data.updateDescription?.updatedFields?.owner) {
await processAssigneeNotification(data);
}
});

Import Completion Stream

const importStream = ContactQueue.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.status': 'completed',
},
},
],
{
fullDocument: 'updateLookup',
},
);

importStream.on('change', async data => {
await processImportNotification(data);
});

📧 Notification Templates

1. Contact Assigned (Person)

Trigger: Person contact assigned to user
Type: bell, browser push
Recipients: Assigned user

Notification Content:

{
title: "Assigned to a new person contact",
body: "You have been assigned to a new person contact: John Doe",
click_action: "https://app.dashclicks.com/contacts/?type=people&id=507f1f77bcf86cd799439011&tab=activity",
module: "contacts",
type: "person_assigned",
subType: "bell" // or "browser"
}

2. Contact Assigned (Business)

Trigger: Business contact assigned to user
Type: bell, browser push
Recipients: Assigned user

Notification Content:

{
title: "Assigned to a new business contact",
body: "You have been assigned to a new business contact: Acme Corp",
click_action: "https://app.dashclicks.com/contacts/?type=businesses&id=507f1f77bcf86cd799439011&tab=activity",
module: "contacts",
type: "business_assigned",
subType: "bell" // or "browser"
}

3. Import Completed

Trigger: Bulk contact import finished
Type: bell, browser push
Recipients: User who initiated import

Notification Content:

{
title: "Contact Import Completed",
body: "Your contact import has been completed successfully",
click_action: "https://app.dashclicks.com/contacts/?type=people", // or "businesses"
module: "contacts",
type: "import_complete",
subType: "bell" // or "browser"
}

🔍 Processing Logic

Assignee Change Notification

// From services/contacts/assignee-change-notifications.js
async function processAssigneeNotification(data) {
try {
// Only process if owner was updated
if (!data.updateDescription?.updatedFields?.owner) {
return;
}

const contact = data.fullDocument;

// 1. Fetch assigned user
const user = await User.findById(contact.owner);

if (!user) {
logger.warn({
message: 'User not found for contact assignment',
contact_id: contact._id,
owner_id: contact.owner,
});
return;
}

// 2. Get active domain for the account
const domainName = await getActiveDomain({
accountId: contact.account?.toString(),
proto: true,
});

const baseURL = `${domainName}/contacts`;

// 3. Determine contact type and notification details
const contactType = contact.type === 'business' ? 'business' : 'person';
const notificationType = contactType === 'business' ? 'business_assigned' : 'person_assigned';

const contactDisplayName = contact.name ?? contact.email ?? contact.phone ?? 'Unknown Contact';

const clickAction = `${baseURL}/?type=${
contactType === 'business' ? 'businesses' : 'people'
}&id=${contact._id}&tab=activity`;

// 4. Send bell notification
await processFCMv2({
verification: {
module: 'contacts',
type: notificationType,
subType: 'bell',
},
content: {
title: `Assigned to a new ${contactType} contact`,
body: `You have been assigned to a new ${contactType} contact: ${contactDisplayName}`,
click_action: clickAction,
},
recipient: {
accountID: contact.parent_account ?? contact.account,
users: [user._id],
},
user_check: true, // Verify user preferences
});

// 5. Send browser push notification
await processFCMv2({
verification: {
module: 'contacts',
type: notificationType,
subType: 'browser',
},
content: {
title: `Assigned to a new ${contactType} contact`,
body: `You have been assigned to a new ${contactType} contact: ${contactDisplayName}`,
click_action: clickAction,
},
recipient: {
accountID: contact.parent_account ?? contact.account,
users: [user._id],
},
user_check: true,
});

logger.info({
message: 'Contact assignment notification sent',
contact_id: contact._id,
user_id: user._id,
contact_type: contactType,
});
} catch (err) {
logger.error({
initiator: 'notification/contact/assign_change_notifications',
message: 'Error in sending assignee change notification',
error: err,
});
}
}

Import Completion Notification

// From services/contacts/import-notifications.js
async function processImportNotification(data) {
try {
const importJob = data.fullDocument;

// 1. Fetch user who initiated import
const user = await User.findById(importJob.user_id);

if (!user) {
logger.warn({
message: 'User not found for import notification',
import_job_id: importJob._id,
user_id: importJob.user_id,
});
return;
}

// 2. Get active domain
const domainName = await getActiveDomain({
accountId: importJob.account_id?.toString(),
proto: true,
});

const baseURL = `${domainName}/contacts`;

// 3. Determine contact type from import job
const contactType = importJob.type === 'business' ? 'businesses' : 'people';

const clickAction = `${baseURL}/?type=${contactType}`;

// 4. Send bell notification
await processFCMv2({
verification: {
module: 'contacts',
type: 'import_complete',
subType: 'bell',
},
content: {
title: 'Contact Import Completed',
body: 'Your contact import has been completed successfully',
click_action: clickAction,
},
recipient: {
accountID: importJob.account_id,
users: [user._id],
},
user_check: true,
});

// 5. Send browser push notification
await processFCMv2({
verification: {
module: 'contacts',
type: 'import_complete',
subType: 'browser',
},
content: {
title: 'Contact Import Completed',
body: 'Your contact import is completed.',
click_action: clickAction,
},
recipient: {
accountID: importJob.account_id,
users: [user._id],
},
user_check: true,
});

logger.info({
message: 'Contact import notification sent',
import_job_id: importJob._id,
user_id: user._id,
contact_type: importJob.type,
});
} catch (err) {
logger.error({
initiator: 'notification/contact/import_notification',
message: 'Error in sending contact import notification',
error: err,
});
}
}

🚨 Error Handling

Change Stream Management

// Graceful shutdown
process.on('SIGINT', async () => {
logger.log({
initiator: 'notifications/contacts',
message: 'Closing contact streams',
});

await contactStream.close();
await importStream.close();
process.exit(0);
});

// Stream error handling
contactStream.on('error', error => {
logger.error({
initiator: 'notifications/contacts/assignee-stream',
message: 'Change stream error',
error: error,
});
});

importStream.on('error', error => {
logger.error({
initiator: 'notifications/contacts/import-stream',
message: 'Change stream error',
error: error,
});
});

User Verification

The module uses processFCMv2 which automatically verifies:

  • User notification preferences
  • Account status
  • User permissions
  • DND (Do Not Disturb) settings

💡 Examples

Example 1: Person Contact Assignment

Trigger Event:

// Contact update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "person",
name: "John Doe",
email: "john.doe@example.com",
phone: "+1 (555) 123-4567",
account: ObjectId("507f1f77bcf86cd799439012"),
owner: ObjectId("507f1f77bcf86cd799439013"), // Changed
updated_at: new Date("2025-10-13T14:30:00Z")
},
updateDescription: {
updatedFields: {
owner: ObjectId("507f1f77bcf86cd799439013")
}
}
}

Resulting Notifications:

  1. Bell - "Assigned to a new person contact: John Doe"
  2. Browser - Same content, delivered via web push

Example 2: Business Contact Assignment

Trigger Event:

// Contact update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "business",
name: "Acme Corp",
email: "contact@acme.com",
account: ObjectId("507f1f77bcf86cd799439012"),
owner: ObjectId("507f1f77bcf86cd799439013"), // Changed
updated_at: new Date("2025-10-13T14:30:00Z")
},
updateDescription: {
updatedFields: {
owner: ObjectId("507f1f77bcf86cd799439013")
}
}
}

Resulting Notifications:

  1. Bell - "Assigned to a new business contact: Acme Corp"
  2. Browser - Same content, delivered via web push

Example 3: Import Completion

Trigger Event:

// ContactQueue insert
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
user_id: ObjectId("507f1f77bcf86cd799439013"),
type: "person",
status: "completed",
total_records: 150,
imported_records: 148,
failed_records: 2,
created_at: new Date("2025-10-13T14:30:00Z")
}
}

Resulting Notifications:

  1. Bell - "Contact Import Completed"
  2. Browser - "Your contact import is completed."

📈 Metrics

Key Metrics:

  • Contact assignments per day: ~300
  • Import jobs per day: ~25
  • Average import size: 150 contacts
  • Notification delivery: less than 2 seconds

Monitoring:

// Contact assignments today
db.getCollection('contacts').count({
updated_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
owner: { $exists: true },
});

// Completed imports today
db.getCollection('contacts.queue').count({
status: 'completed',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

// Notifications sent
db.getCollection('notifications.queue').count({
origin: 'contacts',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

🔧 Troubleshooting

Issue: Assignment notifications not received

Symptoms: User not notified when assigned to contact

Diagnosis:

// Check if owner field was actually updated
db.getCollection('contacts').findOne({ _id: ObjectId('contact_id') }, { owner: 1, updated_at: 1 });

// Check user preferences
db.getCollection('users').findOne({ _id: ObjectId('user_id') }, { notification_preferences: 1 });

Solutions:

  • Verify owner field is being updated in change stream
  • Check user notification preferences for contacts module
  • Verify Firebase credentials for push notifications

Issue: Import notifications delayed

Symptoms: Import completes but notification arrives late

Solutions:

  1. Check ContactQueue change stream is running
  2. Verify status field is set to 'completed'
  3. Check Redis connectivity for queue processing

Module Type: Change Stream
Environment Flag: CONTACTS_ENABLED
Dependencies: MongoDB (replica set), Firebase (FCM)
Notification Channels: Bell, Browser Push (no email)
Status: Active - critical for CRM workflow

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM