👥 Contact Notifications
📖 Overview
The Contact Notifications module handles notifications related to CRM contact management including contact assignments and bulk import completions.
Environment Flag: CONTACTS_ENABLED=true
Trigger Types:
- Change Stream on
Contactcollection (owner/assignee updates) - Change Stream on
ContactQueuecollection (import completions)
Notification Types: bell (FCM), browser push
Location: notifications/services/contacts/
🏗️ Architecture
System Flow
graph TB
subgraph "Triggers"
ASSIGN[Contact Update<br/>Assignee Change]
IMPORT[ContactQueue Insert<br/>Import Complete]
end
subgraph "Processing"
DETECT1[Detect Owner Change]
DETECT2[Detect Import Complete]
VERIFY[Verify User Preferences]
CREATE[Create Notifications]
end
subgraph "Delivery"
BELL[Bell Notification]
BROWSER[Browser Push]
end
ASSIGN --> DETECT1
IMPORT --> DETECT2
DETECT1 --> VERIFY
DETECT2 --> VERIFY
VERIFY --> CREATE
CREATE --> BELL
CREATE --> BROWSER
⚙️ Configuration
Environment Variables
# Module flag
CONTACTS_ENABLED=true # Enable contact notifications
# External dependencies (inherited)
GENERAL_SOCKET=http://localhost:4000 # For bell notifications
FIREBASE_CREDENTIALS=... # For browser push
Change Stream Configuration
Assignee Change Stream
const contactStream = Contact.watch(
[
{
$match: {
operationType: 'update',
},
},
],
{
fullDocument: 'updateLookup',
},
);
contactStream.on('change', async data => {
// Only process if owner field was updated
if (data.updateDescription?.updatedFields?.owner) {
await processAssigneeNotification(data);
}
});
Import Completion Stream
const importStream = ContactQueue.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.status': 'completed',
},
},
],
{
fullDocument: 'updateLookup',
},
);
importStream.on('change', async data => {
await processImportNotification(data);
});
📧 Notification Templates
1. Contact Assigned (Person)
Trigger: Person contact assigned to user
Type: bell, browser push
Recipients: Assigned user
Notification Content:
{
title: "Assigned to a new person contact",
body: "You have been assigned to a new person contact: John Doe",
click_action: "https://app.dashclicks.com/contacts/?type=people&id=507f1f77bcf86cd799439011&tab=activity",
module: "contacts",
type: "person_assigned",
subType: "bell" // or "browser"
}
2. Contact Assigned (Business)
Trigger: Business contact assigned to user
Type: bell, browser push
Recipients: Assigned user
Notification Content:
{
title: "Assigned to a new business contact",
body: "You have been assigned to a new business contact: Acme Corp",
click_action: "https://app.dashclicks.com/contacts/?type=businesses&id=507f1f77bcf86cd799439011&tab=activity",
module: "contacts",
type: "business_assigned",
subType: "bell" // or "browser"
}
3. Import Completed
Trigger: Bulk contact import finished
Type: bell, browser push
Recipients: User who initiated import
Notification Content:
{
title: "Contact Import Completed",
body: "Your contact import has been completed successfully",
click_action: "https://app.dashclicks.com/contacts/?type=people", // or "businesses"
module: "contacts",
type: "import_complete",
subType: "bell" // or "browser"
}
🔍 Processing Logic
Assignee Change Notification
// From services/contacts/assignee-change-notifications.js
async function processAssigneeNotification(data) {
try {
// Only process if owner was updated
if (!data.updateDescription?.updatedFields?.owner) {
return;
}
const contact = data.fullDocument;
// 1. Fetch assigned user
const user = await User.findById(contact.owner);
if (!user) {
logger.warn({
message: 'User not found for contact assignment',
contact_id: contact._id,
owner_id: contact.owner,
});
return;
}
// 2. Get active domain for the account
const domainName = await getActiveDomain({
accountId: contact.account?.toString(),
proto: true,
});
const baseURL = `${domainName}/contacts`;
// 3. Determine contact type and notification details
const contactType = contact.type === 'business' ? 'business' : 'person';
const notificationType = contactType === 'business' ? 'business_assigned' : 'person_assigned';
const contactDisplayName = contact.name ?? contact.email ?? contact.phone ?? 'Unknown Contact';
const clickAction = `${baseURL}/?type=${
contactType === 'business' ? 'businesses' : 'people'
}&id=${contact._id}&tab=activity`;
// 4. Send bell notification
await processFCMv2({
verification: {
module: 'contacts',
type: notificationType,
subType: 'bell',
},
content: {
title: `Assigned to a new ${contactType} contact`,
body: `You have been assigned to a new ${contactType} contact: ${contactDisplayName}`,
click_action: clickAction,
},
recipient: {
accountID: contact.parent_account ?? contact.account,
users: [user._id],
},
user_check: true, // Verify user preferences
});
// 5. Send browser push notification
await processFCMv2({
verification: {
module: 'contacts',
type: notificationType,
subType: 'browser',
},
content: {
title: `Assigned to a new ${contactType} contact`,
body: `You have been assigned to a new ${contactType} contact: ${contactDisplayName}`,
click_action: clickAction,
},
recipient: {
accountID: contact.parent_account ?? contact.account,
users: [user._id],
},
user_check: true,
});
logger.info({
message: 'Contact assignment notification sent',
contact_id: contact._id,
user_id: user._id,
contact_type: contactType,
});
} catch (err) {
logger.error({
initiator: 'notification/contact/assign_change_notifications',
message: 'Error in sending assignee change notification',
error: err,
});
}
}
Import Completion Notification
// From services/contacts/import-notifications.js
async function processImportNotification(data) {
try {
const importJob = data.fullDocument;
// 1. Fetch user who initiated import
const user = await User.findById(importJob.user_id);
if (!user) {
logger.warn({
message: 'User not found for import notification',
import_job_id: importJob._id,
user_id: importJob.user_id,
});
return;
}
// 2. Get active domain
const domainName = await getActiveDomain({
accountId: importJob.account_id?.toString(),
proto: true,
});
const baseURL = `${domainName}/contacts`;
// 3. Determine contact type from import job
const contactType = importJob.type === 'business' ? 'businesses' : 'people';
const clickAction = `${baseURL}/?type=${contactType}`;
// 4. Send bell notification
await processFCMv2({
verification: {
module: 'contacts',
type: 'import_complete',
subType: 'bell',
},
content: {
title: 'Contact Import Completed',
body: 'Your contact import has been completed successfully',
click_action: clickAction,
},
recipient: {
accountID: importJob.account_id,
users: [user._id],
},
user_check: true,
});
// 5. Send browser push notification
await processFCMv2({
verification: {
module: 'contacts',
type: 'import_complete',
subType: 'browser',
},
content: {
title: 'Contact Import Completed',
body: 'Your contact import is completed.',
click_action: clickAction,
},
recipient: {
accountID: importJob.account_id,
users: [user._id],
},
user_check: true,
});
logger.info({
message: 'Contact import notification sent',
import_job_id: importJob._id,
user_id: user._id,
contact_type: importJob.type,
});
} catch (err) {
logger.error({
initiator: 'notification/contact/import_notification',
message: 'Error in sending contact import notification',
error: err,
});
}
}
🚨 Error Handling
Change Stream Management
// Graceful shutdown
process.on('SIGINT', async () => {
logger.log({
initiator: 'notifications/contacts',
message: 'Closing contact streams',
});
await contactStream.close();
await importStream.close();
process.exit(0);
});
// Stream error handling
contactStream.on('error', error => {
logger.error({
initiator: 'notifications/contacts/assignee-stream',
message: 'Change stream error',
error: error,
});
});
importStream.on('error', error => {
logger.error({
initiator: 'notifications/contacts/import-stream',
message: 'Change stream error',
error: error,
});
});
User Verification
The module uses processFCMv2 which automatically verifies:
- User notification preferences
- Account status
- User permissions
- DND (Do Not Disturb) settings
💡 Examples
Example 1: Person Contact Assignment
Trigger Event:
// Contact update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "person",
name: "John Doe",
email: "john.doe@example.com",
phone: "+1 (555) 123-4567",
account: ObjectId("507f1f77bcf86cd799439012"),
owner: ObjectId("507f1f77bcf86cd799439013"), // Changed
updated_at: new Date("2025-10-13T14:30:00Z")
},
updateDescription: {
updatedFields: {
owner: ObjectId("507f1f77bcf86cd799439013")
}
}
}
Resulting Notifications:
- Bell - "Assigned to a new person contact: John Doe"
- Browser - Same content, delivered via web push
Example 2: Business Contact Assignment
Trigger Event:
// Contact update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "business",
name: "Acme Corp",
email: "contact@acme.com",
account: ObjectId("507f1f77bcf86cd799439012"),
owner: ObjectId("507f1f77bcf86cd799439013"), // Changed
updated_at: new Date("2025-10-13T14:30:00Z")
},
updateDescription: {
updatedFields: {
owner: ObjectId("507f1f77bcf86cd799439013")
}
}
}
Resulting Notifications:
- Bell - "Assigned to a new business contact: Acme Corp"
- Browser - Same content, delivered via web push
Example 3: Import Completion
Trigger Event:
// ContactQueue insert
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
account_id: ObjectId("507f1f77bcf86cd799439012"),
user_id: ObjectId("507f1f77bcf86cd799439013"),
type: "person",
status: "completed",
total_records: 150,
imported_records: 148,
failed_records: 2,
created_at: new Date("2025-10-13T14:30:00Z")
}
}
Resulting Notifications:
- Bell - "Contact Import Completed"
- Browser - "Your contact import is completed."
📈 Metrics
Key Metrics:
- Contact assignments per day: ~300
- Import jobs per day: ~25
- Average import size: 150 contacts
- Notification delivery: less than 2 seconds
Monitoring:
// Contact assignments today
db.getCollection('contacts').count({
updated_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
owner: { $exists: true },
});
// Completed imports today
db.getCollection('contacts.queue').count({
status: 'completed',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});
// Notifications sent
db.getCollection('notifications.queue').count({
origin: 'contacts',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});
🔧 Troubleshooting
Issue: Assignment notifications not received
Symptoms: User not notified when assigned to contact
Diagnosis:
// Check if owner field was actually updated
db.getCollection('contacts').findOne({ _id: ObjectId('contact_id') }, { owner: 1, updated_at: 1 });
// Check user preferences
db.getCollection('users').findOne({ _id: ObjectId('user_id') }, { notification_preferences: 1 });
Solutions:
- Verify owner field is being updated in change stream
- Check user notification preferences for
contactsmodule - Verify Firebase credentials for push notifications
Issue: Import notifications delayed
Symptoms: Import completes but notification arrives late
Solutions:
- Check ContactQueue change stream is running
- Verify status field is set to 'completed'
- Check Redis connectivity for queue processing
Module Type: Change Stream
Environment Flag: CONTACTS_ENABLED
Dependencies: MongoDB (replica set), Firebase (FCM)
Notification Channels: Bell, Browser Push (no email)
Status: Active - critical for CRM workflow