📋 Project Notifications
📖 Overview
The Project Notifications module handles all project-related notifications including task assignments, approval requests, task completions, project communications, and utilization alerts.
Environment Flag: PROJECT_ENABLED=true
Trigger Types:
- Change Stream on
Activitycollection (project activities) - Change Stream on
ProjectsTaskscollection (tasks and approvals) - Change Stream on
UtilizationAlertcollection (capacity alerts) - Cron jobs for auto-approval, auto-completion, approval reminders
Notification Types: email, bell (FCM)
Location: notifications/services/projects/
🏗️ Architecture
System Flow
graph TB
subgraph "Triggers"
ACT[Activity Stream<br/>Project Events]
TASK[Tasks Stream<br/>Insert/Update]
APPROVAL[Approval Stream<br/>Completion]
UTIL[Utilization Stream<br/>Alerts]
CRON1[Auto Approval]
CRON2[Auto Complete]
CRON3[Approval Reminder]
end
subgraph "Processing"
PROC1[processNotifications]
PROC2[processNewMessage]
PROC3[processMessage]
PROC4[approvalComplete]
PROC5[utilizationAlert]
end
subgraph "Notifications"
QUEUE[NotificationQueue]
end
ACT --> PROC1
TASK --> PROC2
TASK --> PROC3
APPROVAL --> PROC4
UTIL --> PROC5
CRON1 --> PROC1
CRON2 --> PROC1
CRON3 --> PROC1
PROC1 --> QUEUE
PROC2 --> QUEUE
PROC3 --> QUEUE
PROC4 --> QUEUE
PROC5 --> QUEUE
⚙️ Configuration
Environment Variables
# Module flag
PROJECT_ENABLED=true # Enable project notifications
# External dependencies (inherited)
SENDGRID_API_KEY=your_key
REDIS_HOST=localhost
REDIS_PORT=6379
GENERAL_SOCKET=http://localhost:4000 # For bell notifications
Cron Schedules
// Auto-approval check (tasks awaiting approval)
cron.schedule('0 */1 * * *', async () => {
await checkAutoApproval(); // Every hour
});
// Auto-completion (approved tasks pending completion)
cron.schedule('0 */6 * * *', async () => {
await checkAutoCompletion(); // Every 6 hours
});
// Approval reminders (tasks awaiting approval > 24 hours)
cron.schedule('0 9 * * *', async () => {
await sendApprovalReminders(); // Daily at 9 AM
});
🔄 Change Stream Configuration
Activity Stream (Project Events)
Monitors project activity for various events:
const activityStream = Activity.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.type': 'projects', // Only project activities
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);
activityStream.on('change', async data => {
await processNotifications(data);
});
Activity Types Handled:
- Task created
- Task assigned
- Task status changed
- Task priority changed
- Project updated
Project Tasks Stream (New Tasks/Messages)
Monitors task creation and communication:
const projectTaskStream = ProjectsTasks.watch(
[
{
$match: {
$or: [
{
// New task or approval
operationType: 'insert',
'fullDocument.type': { $in: ['request', 'approval'] },
},
{
// New message on task
operationType: 'update',
'fullDocument.type': { $in: ['request', 'approval'] },
'updateDescription.updatedFields.last_communication_id': { $exists: true },
},
],
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);
projectTaskStream.on('change', async data => {
if (data.operationType === 'insert') {
// New task created
await processNewMessageNotification(data);
} else if (data.operationType === 'update') {
// Check if this is truly a new message or just task creation update
if (
data.fullDocument.first_communication_id.toString() ===
data.fullDocument.last_communication_id.toString()
) {
// Skip - this is the initial message on task creation
return;
}
// New message added to existing task
await processMessageNotification(data);
}
});
Task Types:
request- Work request from clientapproval- Approval request for completed work
Approval Completion Stream
Monitors approval task completions:
const approvalCompletionStream = ProjectsTasks.watch(
[
{
$match: {
operationType: 'update',
'fullDocument.type': 'approval',
'updateDescription.updatedFields.status': 'completed',
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);
approvalCompletionStream.on('change', async data => {
await approvalCompleteNotification(data);
});
Utilization Alert Stream
Monitors capacity/utilization warnings:
const utilizationStream = UtilizationAlert.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.status': 'active',
'fullDocument.email_sent': false,
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);
utilizationStream.on('change', async data => {
await utilizationAlertNotification(data);
});
📧 Notification Templates
1. Task Assigned
Trigger: New task assigned to user
Type: email, bell
Recipients: Assigned user
Email Content Variables:
{
task_title: "Update website header",
task_type: "request",
project_name: "Website Redesign",
assignee_name: "John Doe",
assigner_name: "Jane Smith",
due_date: "2025-10-20",
priority: "high",
description: "Update the main header with new branding...",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011"
}
2. Approval Request
Trigger: Work submitted for approval
Type: email, bell
Recipients: Client/approver
Email Content Variables:
{
task_title: "Homepage Design",
project_name: "Website Redesign",
submitter_name: "Designer Team",
submission_date: "2025-10-13",
message: "Homepage design is complete and ready for your review.",
attachments: ["design_v1.png", "design_v2.png"],
approval_url: "https://app.dashclicks.com/projects/approvals/507f1f77bcf86cd799439011"
}
3. New Message on Task
Trigger: New comment/message added to task
Type: email, bell
Recipients: Task participants (assignee, creator, client)
Email Content Variables:
{
task_title: "Update website header",
sender_name: "John Doe",
message: "I've completed the header update. Please review.",
message_date: "2025-10-13 14:30",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011",
project_name: "Website Redesign"
}
4. Approval Completed
Trigger: Approval task marked as completed
Type: email, bell
Recipients: Task assignee, project team
Email Content Variables:
{
task_title: "Homepage Design",
project_name: "Website Redesign",
approver_name: "Jane Smith",
approval_status: "approved",
feedback: "Looks great! Approved to proceed.",
completion_date: "2025-10-13",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011"
}
5. Utilization Alert
Trigger: Team capacity threshold exceeded
Type: email
Recipients: Account owner, project managers
Email Content Variables:
{
alert_type: "over_capacity",
current_utilization: "92%",
threshold: "85%",
affected_team: "Design Team",
active_tasks: 15,
pending_approvals: 5,
recommendation: "Consider redistributing tasks or adjusting deadlines.",
dashboard_url: "https://app.dashclicks.com/projects/utilization"
}
6. Approval Reminder (24 hours)
Trigger: Approval pending for > 24 hours
Type: email
Recipients: Client/approver
Email Content Variables:
{
task_title: "Homepage Design",
project_name: "Website Redesign",
pending_days: 1,
submission_date: "2025-10-12",
reminder_message: "This task is awaiting your approval. Please review at your earliest convenience.",
approval_url: "https://app.dashclicks.com/projects/approvals/507f1f77bcf86cd799439011"
}
🔍 Processing Logic
New Task Notification
// From services/projects/processNewMessageNotification.js
async function processNewMessageNotification(data) {
const task = data.fullDocument;
// 1. Fetch task with related data
const fullTask = await ProjectsTasks.findById(task._id)
.populate('account_id')
.populate('assigned_to')
.populate('created_by')
.populate('project_id')
.lean();
// 2. Determine recipient based on task type
let recipient;
if (task.type === 'request') {
// Request: notify assignee
recipient = fullTask.assigned_to;
} else if (task.type === 'approval') {
// Approval: notify client
const project = await Projects.findById(fullTask.project_id).lean();
recipient = await User.findById(project.client_id).lean();
}
if (!recipient) {
logger.warn({
message: 'No recipient found for task notification',
task_id: task._id,
});
return;
}
// 3. Verify user preferences
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: fullTask.account_id,
module: 'projects',
type: 'task_assigned',
subType: 'email',
});
if (!canNotify) return;
// 4. Fetch first communication message
const message = await Communication.findById(fullTask.first_communication_id).lean();
// 5. Create email notification
await NotificationQueue.create({
type: 'email',
origin: 'projects',
sender_account: fullTask.account_id,
sender_user: fullTask.created_by,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id:
task.type === 'request' ? 'd-task-assigned-template' : 'd-approval-request-template',
additional_data: {
task_title: fullTask.title,
task_type: fullTask.type,
project_name: fullTask.project_id.name,
assignee_name: recipient.name,
assigner_name: fullTask.created_by.name,
due_date: formatDate(fullTask.due_date),
priority: fullTask.priority,
description: message.content,
task_url: `https://app.dashclicks.com/projects/tasks/${task._id}`,
},
},
check_credits: false,
});
// 6. Create bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'projects',
sender_account: fullTask.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: task.type === 'request' ? 'New Task Assigned' : 'Approval Required',
body: `${fullTask.created_by.name}: ${fullTask.title}`,
click_action: `https://app.dashclicks.com/projects/tasks/${task._id}`,
module: 'projects',
type: 'task',
data: {
subType: 'bell',
task_id: task._id.toString(),
task_type: task.type,
},
},
});
}
Message Notification
// From services/projects/processMessageNotification.js
async function processMessageNotification(data) {
const task = data.fullDocument;
// 1. Fetch new message
const message = await Communication.findById(task.last_communication_id)
.populate('sender_id')
.lean();
if (!message) return;
// 2. Fetch full task details
const fullTask = await ProjectsTasks.findById(task._id)
.populate('account_id')
.populate('assigned_to')
.populate('created_by')
.populate('project_id')
.lean();
// 3. Determine recipients (exclude sender)
const recipients = [];
// Add assignee
if (
fullTask.assigned_to &&
fullTask.assigned_to._id.toString() !== message.sender_id._id.toString()
) {
recipients.push(fullTask.assigned_to);
}
// Add creator
if (
fullTask.created_by &&
fullTask.created_by._id.toString() !== message.sender_id._id.toString()
) {
recipients.push(fullTask.created_by);
}
// Add client (for approval tasks)
if (task.type === 'approval') {
const project = await Projects.findById(fullTask.project_id).lean();
const client = await User.findById(project.client_id).lean();
if (client && client._id.toString() !== message.sender_id._id.toString()) {
recipients.push(client);
}
}
// 4. Send to each recipient
for (const recipient of recipients) {
// Verify preferences
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: fullTask.account_id,
module: 'projects',
type: 'message',
subType: 'email',
});
if (!canNotify) continue;
// Email notification
await NotificationQueue.create({
type: 'email',
origin: 'projects',
sender_account: fullTask.account_id,
sender_user: message.sender_id._id,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id: 'd-task-message-template',
additional_data: {
task_title: fullTask.title,
sender_name: message.sender_id.name,
message: message.content,
message_date: formatDateTime(message.created_at),
task_url: `https://app.dashclicks.com/projects/tasks/${task._id}`,
project_name: fullTask.project_id.name,
},
},
check_credits: false,
});
// Bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'projects',
sender_account: fullTask.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: 'New Message',
body: `${message.sender_id.name} on ${fullTask.title}`,
click_action: `https://app.dashclicks.com/projects/tasks/${task._id}`,
module: 'projects',
type: 'message',
data: {
subType: 'bell',
task_id: task._id.toString(),
message_id: message._id.toString(),
},
},
});
}
}
🚨 Error Handling
Change Stream Reconnection
The module includes robust connection handling:
// Handle disconnected event
mongoose.connection.on('disconnected', () => {
logger.log({
initiator: 'notifications/projects/change-stream',
message: 'Primary MongoDB connection lost - waiting for reconnection',
});
});
// Monitor reconnection
mongoose.connection.on('reconnected', () => {
logger.log({
initiator: 'notifications/projects/change-stream',
message: 'Reconnected to MongoDB',
});
});
// Exit on critical errors
mongoose.connection.on('error', err => {
logger.error({
initiator: 'notifications/projects/change-stream',
message: 'MongoDB connection error',
error: err,
});
if (err.name === 'MongoNetworkError' && err.message.includes('no primary found')) {
process.exit(1);
}
});
Resume Token Management
// Read resume time from file
let RESUMETIME = Buffer.from(fs.readFileSync(__dirname + '/../../.starttime')).toString('utf-8');
if (RESUMETIME) {
RESUMETIME = parseInt(RESUMETIME);
RESUMETIME = new (require('mongodb').Timestamp)({ t: RESUMETIME, i: 1 });
}
// Use in change streams
Activity.watch(pipeline, {
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
});
💡 Examples
Example 1: New Task Assignment
Trigger Event:
// ProjectsTasks insert
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "request",
title: "Update website header",
project_id: ObjectId("507f1f77bcf86cd799439012"),
account_id: ObjectId("507f1f77bcf86cd799439013"),
assigned_to: ObjectId("507f1f77bcf86cd799439014"),
created_by: ObjectId("507f1f77bcf86cd799439015"),
priority: "high",
due_date: new Date("2025-10-20"),
status: "pending",
first_communication_id: ObjectId("507f1f77bcf86cd799439016"),
last_communication_id: ObjectId("507f1f77bcf86cd799439016")
}
}
Resulting Notifications:
- Email - Task assignment details with description
- Bell - Real-time notification "New Task Assigned"
Delivery:
- Sent to: Assigned user
- Template:
d-task-assigned-template - Result: Email + bell notification
Example 2: Approval Completion
Trigger Event:
// ProjectsTasks update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "approval",
title: "Homepage Design",
status: "completed", // Changed from "pending"
approved_by: ObjectId("507f1f77bcf86cd799439020"),
approved_at: new Date("2025-10-13T15:30:00Z")
},
updateDescription: {
updatedFields: {
status: "completed"
}
}
}
Resulting Notifications:
- Email - Approval confirmation to task assignee
- Bell - "Task approved" notification
Content: Approval status, feedback, next steps
📈 Metrics
Key Metrics:
- Tasks created per day: ~200
- Approval requests per day: ~50
- Average task response time: 4 hours
- Notification success rate: >99%
Monitoring:
// Tasks created today
db.getCollection('projects.tasks').count({
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});
// Pending approvals
db.getCollection('projects.tasks').count({
type: 'approval',
status: 'pending',
});
// Notifications sent
db.getCollection('notifications.queue').count({
origin: 'projects',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});
Module Type: Change Stream + Cron Job
Environment Flag: PROJECT_ENABLED
Dependencies: MongoDB (replica set), SendGrid, Firebase
Status: Active - critical for project management