Skip to main content

📋 Project Notifications

📖 Overview

The Project Notifications module handles all project-related notifications including task assignments, approval requests, task completions, project communications, and utilization alerts.

Environment Flag: PROJECT_ENABLED=true

Trigger Types:

  • Change Stream on Activity collection (project activities)
  • Change Stream on ProjectsTasks collection (tasks and approvals)
  • Change Stream on UtilizationAlert collection (capacity alerts)
  • Cron jobs for auto-approval, auto-completion, approval reminders

Notification Types: email, bell (FCM)
Location: notifications/services/projects/

🏗️ Architecture

System Flow

graph TB
subgraph "Triggers"
ACT[Activity Stream<br/>Project Events]
TASK[Tasks Stream<br/>Insert/Update]
APPROVAL[Approval Stream<br/>Completion]
UTIL[Utilization Stream<br/>Alerts]
CRON1[Auto Approval]
CRON2[Auto Complete]
CRON3[Approval Reminder]
end

subgraph "Processing"
PROC1[processNotifications]
PROC2[processNewMessage]
PROC3[processMessage]
PROC4[approvalComplete]
PROC5[utilizationAlert]
end

subgraph "Notifications"
QUEUE[NotificationQueue]
end

ACT --> PROC1
TASK --> PROC2
TASK --> PROC3
APPROVAL --> PROC4
UTIL --> PROC5
CRON1 --> PROC1
CRON2 --> PROC1
CRON3 --> PROC1

PROC1 --> QUEUE
PROC2 --> QUEUE
PROC3 --> QUEUE
PROC4 --> QUEUE
PROC5 --> QUEUE

⚙️ Configuration

Environment Variables

# Module flag
PROJECT_ENABLED=true # Enable project notifications

# External dependencies (inherited)
SENDGRID_API_KEY=your_key
REDIS_HOST=localhost
REDIS_PORT=6379
GENERAL_SOCKET=http://localhost:4000 # For bell notifications

Cron Schedules

// Auto-approval check (tasks awaiting approval)
cron.schedule('0 */1 * * *', async () => {
await checkAutoApproval(); // Every hour
});

// Auto-completion (approved tasks pending completion)
cron.schedule('0 */6 * * *', async () => {
await checkAutoCompletion(); // Every 6 hours
});

// Approval reminders (tasks awaiting approval > 24 hours)
cron.schedule('0 9 * * *', async () => {
await sendApprovalReminders(); // Daily at 9 AM
});

🔄 Change Stream Configuration

Activity Stream (Project Events)

Monitors project activity for various events:

const activityStream = Activity.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.type': 'projects', // Only project activities
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);

activityStream.on('change', async data => {
await processNotifications(data);
});

Activity Types Handled:

  • Task created
  • Task assigned
  • Task status changed
  • Task priority changed
  • Project updated

Project Tasks Stream (New Tasks/Messages)

Monitors task creation and communication:

const projectTaskStream = ProjectsTasks.watch(
[
{
$match: {
$or: [
{
// New task or approval
operationType: 'insert',
'fullDocument.type': { $in: ['request', 'approval'] },
},
{
// New message on task
operationType: 'update',
'fullDocument.type': { $in: ['request', 'approval'] },
'updateDescription.updatedFields.last_communication_id': { $exists: true },
},
],
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);

projectTaskStream.on('change', async data => {
if (data.operationType === 'insert') {
// New task created
await processNewMessageNotification(data);
} else if (data.operationType === 'update') {
// Check if this is truly a new message or just task creation update
if (
data.fullDocument.first_communication_id.toString() ===
data.fullDocument.last_communication_id.toString()
) {
// Skip - this is the initial message on task creation
return;
}
// New message added to existing task
await processMessageNotification(data);
}
});

Task Types:

  • request - Work request from client
  • approval - Approval request for completed work

Approval Completion Stream

Monitors approval task completions:

const approvalCompletionStream = ProjectsTasks.watch(
[
{
$match: {
operationType: 'update',
'fullDocument.type': 'approval',
'updateDescription.updatedFields.status': 'completed',
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);

approvalCompletionStream.on('change', async data => {
await approvalCompleteNotification(data);
});

Utilization Alert Stream

Monitors capacity/utilization warnings:

const utilizationStream = UtilizationAlert.watch(
[
{
$match: {
operationType: 'insert',
'fullDocument.status': 'active',
'fullDocument.email_sent': false,
},
},
],
{
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
},
);

utilizationStream.on('change', async data => {
await utilizationAlertNotification(data);
});

📧 Notification Templates

1. Task Assigned

Trigger: New task assigned to user
Type: email, bell
Recipients: Assigned user

Email Content Variables:

{
task_title: "Update website header",
task_type: "request",
project_name: "Website Redesign",
assignee_name: "John Doe",
assigner_name: "Jane Smith",
due_date: "2025-10-20",
priority: "high",
description: "Update the main header with new branding...",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011"
}

2. Approval Request

Trigger: Work submitted for approval
Type: email, bell
Recipients: Client/approver

Email Content Variables:

{
task_title: "Homepage Design",
project_name: "Website Redesign",
submitter_name: "Designer Team",
submission_date: "2025-10-13",
message: "Homepage design is complete and ready for your review.",
attachments: ["design_v1.png", "design_v2.png"],
approval_url: "https://app.dashclicks.com/projects/approvals/507f1f77bcf86cd799439011"
}

3. New Message on Task

Trigger: New comment/message added to task
Type: email, bell
Recipients: Task participants (assignee, creator, client)

Email Content Variables:

{
task_title: "Update website header",
sender_name: "John Doe",
message: "I've completed the header update. Please review.",
message_date: "2025-10-13 14:30",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011",
project_name: "Website Redesign"
}

4. Approval Completed

Trigger: Approval task marked as completed
Type: email, bell
Recipients: Task assignee, project team

Email Content Variables:

{
task_title: "Homepage Design",
project_name: "Website Redesign",
approver_name: "Jane Smith",
approval_status: "approved",
feedback: "Looks great! Approved to proceed.",
completion_date: "2025-10-13",
task_url: "https://app.dashclicks.com/projects/tasks/507f1f77bcf86cd799439011"
}

5. Utilization Alert

Trigger: Team capacity threshold exceeded
Type: email
Recipients: Account owner, project managers

Email Content Variables:

{
alert_type: "over_capacity",
current_utilization: "92%",
threshold: "85%",
affected_team: "Design Team",
active_tasks: 15,
pending_approvals: 5,
recommendation: "Consider redistributing tasks or adjusting deadlines.",
dashboard_url: "https://app.dashclicks.com/projects/utilization"
}

6. Approval Reminder (24 hours)

Trigger: Approval pending for > 24 hours
Type: email
Recipients: Client/approver

Email Content Variables:

{
task_title: "Homepage Design",
project_name: "Website Redesign",
pending_days: 1,
submission_date: "2025-10-12",
reminder_message: "This task is awaiting your approval. Please review at your earliest convenience.",
approval_url: "https://app.dashclicks.com/projects/approvals/507f1f77bcf86cd799439011"
}

🔍 Processing Logic

New Task Notification

// From services/projects/processNewMessageNotification.js
async function processNewMessageNotification(data) {
const task = data.fullDocument;

// 1. Fetch task with related data
const fullTask = await ProjectsTasks.findById(task._id)
.populate('account_id')
.populate('assigned_to')
.populate('created_by')
.populate('project_id')
.lean();

// 2. Determine recipient based on task type
let recipient;

if (task.type === 'request') {
// Request: notify assignee
recipient = fullTask.assigned_to;
} else if (task.type === 'approval') {
// Approval: notify client
const project = await Projects.findById(fullTask.project_id).lean();
recipient = await User.findById(project.client_id).lean();
}

if (!recipient) {
logger.warn({
message: 'No recipient found for task notification',
task_id: task._id,
});
return;
}

// 3. Verify user preferences
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: fullTask.account_id,
module: 'projects',
type: 'task_assigned',
subType: 'email',
});

if (!canNotify) return;

// 4. Fetch first communication message
const message = await Communication.findById(fullTask.first_communication_id).lean();

// 5. Create email notification
await NotificationQueue.create({
type: 'email',
origin: 'projects',
sender_account: fullTask.account_id,
sender_user: fullTask.created_by,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id:
task.type === 'request' ? 'd-task-assigned-template' : 'd-approval-request-template',
additional_data: {
task_title: fullTask.title,
task_type: fullTask.type,
project_name: fullTask.project_id.name,
assignee_name: recipient.name,
assigner_name: fullTask.created_by.name,
due_date: formatDate(fullTask.due_date),
priority: fullTask.priority,
description: message.content,
task_url: `https://app.dashclicks.com/projects/tasks/${task._id}`,
},
},
check_credits: false,
});

// 6. Create bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'projects',
sender_account: fullTask.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: task.type === 'request' ? 'New Task Assigned' : 'Approval Required',
body: `${fullTask.created_by.name}: ${fullTask.title}`,
click_action: `https://app.dashclicks.com/projects/tasks/${task._id}`,
module: 'projects',
type: 'task',
data: {
subType: 'bell',
task_id: task._id.toString(),
task_type: task.type,
},
},
});
}

Message Notification

// From services/projects/processMessageNotification.js
async function processMessageNotification(data) {
const task = data.fullDocument;

// 1. Fetch new message
const message = await Communication.findById(task.last_communication_id)
.populate('sender_id')
.lean();

if (!message) return;

// 2. Fetch full task details
const fullTask = await ProjectsTasks.findById(task._id)
.populate('account_id')
.populate('assigned_to')
.populate('created_by')
.populate('project_id')
.lean();

// 3. Determine recipients (exclude sender)
const recipients = [];

// Add assignee
if (
fullTask.assigned_to &&
fullTask.assigned_to._id.toString() !== message.sender_id._id.toString()
) {
recipients.push(fullTask.assigned_to);
}

// Add creator
if (
fullTask.created_by &&
fullTask.created_by._id.toString() !== message.sender_id._id.toString()
) {
recipients.push(fullTask.created_by);
}

// Add client (for approval tasks)
if (task.type === 'approval') {
const project = await Projects.findById(fullTask.project_id).lean();
const client = await User.findById(project.client_id).lean();

if (client && client._id.toString() !== message.sender_id._id.toString()) {
recipients.push(client);
}
}

// 4. Send to each recipient
for (const recipient of recipients) {
// Verify preferences
const canNotify = await NotificationUtil.verify({
userID: recipient._id,
accountID: fullTask.account_id,
module: 'projects',
type: 'message',
subType: 'email',
});

if (!canNotify) continue;

// Email notification
await NotificationQueue.create({
type: 'email',
origin: 'projects',
sender_account: fullTask.account_id,
sender_user: message.sender_id._id,
recipient: {
name: recipient.name,
email: recipient.email,
},
content: {
template_id: 'd-task-message-template',
additional_data: {
task_title: fullTask.title,
sender_name: message.sender_id.name,
message: message.content,
message_date: formatDateTime(message.created_at),
task_url: `https://app.dashclicks.com/projects/tasks/${task._id}`,
project_name: fullTask.project_id.name,
},
},
check_credits: false,
});

// Bell notification
await NotificationQueue.create({
type: 'fcm',
origin: 'projects',
sender_account: fullTask.account_id,
recipient: {
user_id: recipient._id,
},
content: {
title: 'New Message',
body: `${message.sender_id.name} on ${fullTask.title}`,
click_action: `https://app.dashclicks.com/projects/tasks/${task._id}`,
module: 'projects',
type: 'message',
data: {
subType: 'bell',
task_id: task._id.toString(),
message_id: message._id.toString(),
},
},
});
}
}

🚨 Error Handling

Change Stream Reconnection

The module includes robust connection handling:

// Handle disconnected event
mongoose.connection.on('disconnected', () => {
logger.log({
initiator: 'notifications/projects/change-stream',
message: 'Primary MongoDB connection lost - waiting for reconnection',
});
});

// Monitor reconnection
mongoose.connection.on('reconnected', () => {
logger.log({
initiator: 'notifications/projects/change-stream',
message: 'Reconnected to MongoDB',
});
});

// Exit on critical errors
mongoose.connection.on('error', err => {
logger.error({
initiator: 'notifications/projects/change-stream',
message: 'MongoDB connection error',
error: err,
});

if (err.name === 'MongoNetworkError' && err.message.includes('no primary found')) {
process.exit(1);
}
});

Resume Token Management

// Read resume time from file
let RESUMETIME = Buffer.from(fs.readFileSync(__dirname + '/../../.starttime')).toString('utf-8');

if (RESUMETIME) {
RESUMETIME = parseInt(RESUMETIME);
RESUMETIME = new (require('mongodb').Timestamp)({ t: RESUMETIME, i: 1 });
}

// Use in change streams
Activity.watch(pipeline, {
fullDocument: 'updateLookup',
startAtOperationTime: RESUMETIME || undefined,
});

💡 Examples

Example 1: New Task Assignment

Trigger Event:

// ProjectsTasks insert
{
operationType: 'insert',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "request",
title: "Update website header",
project_id: ObjectId("507f1f77bcf86cd799439012"),
account_id: ObjectId("507f1f77bcf86cd799439013"),
assigned_to: ObjectId("507f1f77bcf86cd799439014"),
created_by: ObjectId("507f1f77bcf86cd799439015"),
priority: "high",
due_date: new Date("2025-10-20"),
status: "pending",
first_communication_id: ObjectId("507f1f77bcf86cd799439016"),
last_communication_id: ObjectId("507f1f77bcf86cd799439016")
}
}

Resulting Notifications:

  1. Email - Task assignment details with description
  2. Bell - Real-time notification "New Task Assigned"

Delivery:

  • Sent to: Assigned user
  • Template: d-task-assigned-template
  • Result: Email + bell notification

Example 2: Approval Completion

Trigger Event:

// ProjectsTasks update
{
operationType: 'update',
fullDocument: {
_id: ObjectId("507f1f77bcf86cd799439011"),
type: "approval",
title: "Homepage Design",
status: "completed", // Changed from "pending"
approved_by: ObjectId("507f1f77bcf86cd799439020"),
approved_at: new Date("2025-10-13T15:30:00Z")
},
updateDescription: {
updatedFields: {
status: "completed"
}
}
}

Resulting Notifications:

  1. Email - Approval confirmation to task assignee
  2. Bell - "Task approved" notification

Content: Approval status, feedback, next steps

📈 Metrics

Key Metrics:

  • Tasks created per day: ~200
  • Approval requests per day: ~50
  • Average task response time: 4 hours
  • Notification success rate: >99%

Monitoring:

// Tasks created today
db.getCollection('projects.tasks').count({
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

// Pending approvals
db.getCollection('projects.tasks').count({
type: 'approval',
status: 'pending',
});

// Notifications sent
db.getCollection('notifications.queue').count({
origin: 'projects',
created_at: { $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
});

Module Type: Change Stream + Cron Job
Environment Flag: PROJECT_ENABLED
Dependencies: MongoDB (replica set), SendGrid, Firebase
Status: Active - critical for project management

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM