๐ Support Notification Processing
๐ Overviewโ
The Support Notification job sends real-time notifications to support agents when new messages arrive from customers via SMS or email. It runs every 5 seconds to detect unnotified messages and emits socket events to trigger desktop/mobile notifications for support team members.
Complete Flow:
- Cron Initialization:
queue-manager/crons/conversations/support.notification.js - Service Processing:
queue-manager/services/conversations/support.notification.js - Queue Definition:
queue-manager/queues/conversations/support.notification.js
Execution Pattern: Cron-based (every 5 seconds)
Queue Name: support_message_status_queue (shared with message status)
Environment Flag: QM_CONVERSATIONS=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Notification Service
participant DB as SupportMessage<br/>Collection
participant QUEUE as Bull Queue
participant ROOM as SupportRoom<br/>Collection
participant SOCKET as Conversation Socket
participant AGENT as Support Agent<br/>Dashboard
CRON->>SERVICE: Check for new messages
SERVICE->>DB: Query messages<br/>(sender_type='support',<br/>source=sms/email,<br/>user_notified=false)
DB-->>SERVICE: Return unnotified messages
SERVICE->>DB: Mark user_notification_in_progress=true
loop For each message
SERVICE->>QUEUE: Add notification job
QUEUE->>DB: Update user_notified=true
QUEUE->>ROOM: Fetch room details<br/>(for account_id)
ROOM-->>QUEUE: Return room data
QUEUE->>SOCKET: Emit 'newMessage' event
SOCKET-->>AGENT: Trigger desktop notification
SOCKET-->>AGENT: Play notification sound
SOCKET-->>AGENT: Update UI with new message
QUEUE->>DB: Reset user_notification_in_progress=false
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/conversations/support.notification.js
Purpose: Schedule notification checks every 5 seconds
Cron Pattern: */5 * * * * * (every 5 seconds)
Initialization:
const supportNotification = require('../../services/conversations/support.notification');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await supportNotification();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/conversations/support-notification', error: err });
}
};
In-Progress Lock: Prevents overlapping executions.
2. Service Processing (THE CORE LOGIC)โ
File: queue-manager/services/conversations/support.notification.js
Purpose: Query unnotified support messages and add to notification queue
Matching Criteria:
sender_type: 'support'- Messages received from customers (not sent by agents)source: ['sms', 'email']- SMS or email messages onlyuser_notified: false- Not yet notified to agentsuser_notification_in_progress: { $ne: true }- Not already processing
Processing Logic:
const { SupportMessage } = require('../../models');
const Queue = require('../../queues/conversations/support.notification');
module.exports = async () => {
try {
// 1. Query unnotified support messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'support', // Customer messages
source: {
$in: ['sms', 'email'], // SMS and email only
},
user_notified: false, // Not yet notified
user_notification_in_progress: {
$ne: true, // Not already processing
},
},
},
{
$sort: {
created: -1, // Most recent first
},
},
]);
if (messages.length) {
// 2. Collect message IDs
const ids = messages.map(message => message._id);
// 3. Mark as processing to prevent duplicate notifications
await SupportMessage.updateMany(
{ _id: { $in: ids } },
{ user_notification_in_progress: true },
);
// 4. Add each message to notification queue
await Promise.all(
messages.map(async message => {
try {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
},
);
} catch (err) {
// Reset processing flag on error
await SupportMessage.updateOne(
{ _id: message._id },
{ user_notification_in_progress: false },
);
console.log(
`Error occured while sending notification for messages`,
err.message,
err.stack,
);
}
}),
);
console.log('Processed messages notifications.');
}
} catch (err) {
console.log(`Error occured while sending notification for messages`, err.message, err.stack);
}
};
3. Queue Definitionโ
File: queue-manager/queues/conversations/support.notification.js
Purpose: Mark message as notified and emit socket event for agent notification
Queue Options:
{
attempts: 10, // 10 retry attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup
}
Job Processor:
queue.process(async (job, done) => {
try {
const { id } = job.data;
// Update message and emit socket event in transaction
await withTransaction(async session => {
// 1. Mark message as notified
const message = await SupportMessage.findByIdAndUpdate(
id,
{ user_notified: true },
{ new: true, session },
);
// 2. Fetch room details for account_id
const room = await SupportRoom.findById(message.room_id);
// 3. Generate JWT token for socket authentication
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: room.account_id.toString(),
type: 'support',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);
// 4. Emit socket event to notify agents
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'newMessage', // Triggers agent notification
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': room.account_id?.toString(),
},
},
);
});
return done();
} catch (err) {
done(err);
}
});
Event Handlers:
// Error handler
queue.on('error', err => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error in support notification queue',
});
});
// Failed job handler
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
job_id: job.id,
job_data: job.data,
});
// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
}
});
// Completed job handler
queue.on('completed', async job => {
try {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error while finalizing support notification queue',
});
}
});
๐๏ธ Collections Usedโ
support_messagesโ
- Operations: Read, Update
- Model:
shared/models/support-message.js - Usage Context:
- Query messages without user notification
- Update
user_notifiedfield to true - Manage
user_notification_in_progressflag - Get message details for notification
Key Fields:
sender_type: 'support' (messages from customers)source: 'sms' | 'email' (communication channel)user_notified: Boolean indicating agents notifieduser_notification_in_progress: Processing lock flagroom_id: Reference to SupportRoomsender_id: Conversation ID for socket routing
support_roomsโ
- Operations: Read
- Model:
shared/models/support-room.js - Usage Context:
- Lookup room by message.room_id
- Get account_id for socket authentication
- Identify assigned support agents
Key Fields:
account_id: Owner account for authenticationassigned_to: Support agent(s) to notifystatus: Room status (open/snoozed/closed)
๐ง Job Configurationโ
Queue Optionsโ
{
attempts: 10, // Maximum retry attempts
backoff: {
type: "exponential", // Exponential backoff strategy
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup on success
}
Cron Scheduleโ
'*/5 * * * * *'; // Every 5 seconds
Frequency Rationale: 5-second intervals provide near-instant agent notifications while minimizing database load.
๐ Processing Logic - Detailed Flowโ
Service Layer Processingโ
Service Function: module.exports (anonymous async function)
Purpose: Query unnotified support messages and add to notification queue
Processing Steps:
-
Query Unnotified Support Messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'support', // Customer messages
source: { $in: ['sms', 'email'] },
user_notified: false,
user_notification_in_progress: { $ne: true },
},
},
{ $sort: { created: -1 } },
]);Matching Criteria:
- Only customer-sent messages (sender_type='support')
- SMS and email channels only
- Not yet notified to agents
- Not already being processed
-
Mark as Processing
const ids = messages.map(message => message._id);
await SupportMessage.updateMany({ _id: { $in: ids } }, { user_notification_in_progress: true });Purpose: Prevent duplicate notifications to agents
-
Add Notification Jobs
await Promise.all(
messages.map(async message => {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: { type: 'exponential', delay: 1000 },
},
);
}),
); -
Error Recovery
catch (err) {
// Reset flag to allow retry on next cron run
await SupportMessage.updateOne(
{ _id: message._id },
{ user_notification_in_progress: false }
);
}
Queue Processingโ
Queue Processor: Inside queues/conversations/support.notification.js
Purpose: Mark message as notified and emit socket event for agent notification
Job Data Structure:
{
id: ObjectId; // SupportMessage document ID
}
Processing Steps:
-
Mark Message as Notified
const message = await SupportMessage.findByIdAndUpdate(
id,
{ user_notified: true },
{ new: true, session },
);Purpose: Prevent duplicate notifications
-
Fetch Room Details
const room = await SupportRoom.findById(message.room_id);Needed: room.account_id for socket authentication and routing
-
Generate JWT Token
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: room.account_id.toString(),
type: 'support',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);Purpose: Authenticate with Conversation Socket service
-
Emit Socket Event
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'newMessage', // Triggers notification
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': room.account_id?.toString(),
},
},
);Agent Notification: Triggers:
- Desktop notification
- Browser notification sound
- Message indicator in UI
- Badge count update
-
Cleanup Processing Flag
queue.on('completed', async job => {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
});
Error Handling in Flowโ
Service Layer Errors:
try {
const queue = await Queue.start();
await queue.add({ id: message._id }, { attempts: 10, backoff: {...} });
} catch (err) {
// Reset flag to allow retry
await SupportMessage.updateOne(
{ _id: message._id },
{ user_notification_in_progress: false }
);
console.log(`Error occured while sending notification for messages`, err.message);
}
Queue Processor Errors:
// Bull automatically retries based on attempts config (10 attempts)
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
job_id: job.id,
});
// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
}
});
Retry Strategy: 10 attempts with exponential backoff starting at 1 second
๐จ Error Handlingโ
Common Error Scenariosโ
Room Not Foundโ
const room = await SupportRoom.findById(message.room_id);
if (!room) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: 'Room not found',
message_id: message._id,
room_id: message.room_id,
});
throw new Error('Room not found'); // Trigger retry
}
Reason: Race condition - room may have been deleted
Socket Emit Failureโ
try {
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: 'Socket emit failed',
details: error.message
});
throw error; // Trigger retry - notification must be sent
}
Reason: Conversation Socket service may be temporarily unavailable
Critical: Notifications MUST be sent to agents, so failures trigger retries
Transaction Failureโ
await withTransaction(async session => {
// Update message
const message = await SupportMessage.findByIdAndUpdate(id, { user_notified: true }, { session });
// Emit socket event
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
});
// If any step fails, entire transaction rolls back
Protection: Ensures message notification flag and socket emission stay synchronized
Retry Strategyโ
{
attempts: 10, // Maximum 10 attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
}
}
Backoff Schedule:
- Attempt 1: Immediate
- Attempt 2: 1 second
- Attempt 3: 2 seconds
- Attempt 4: 4 seconds
- Attempt 5-10: Progressive exponential backoff (8s, 16s, 32s, 64s, 128s, 256s)
๐ Monitoring & Loggingโ
Success Loggingโ
console.log('Processed messages notifications.');
console.log(`Notified agents of ${messages.length} new messages`);
Error Loggingโ
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error in support notification queue',
});
Performance Metricsโ
- Average Processing Time: ~300-500ms per message (includes socket emit)
- Success Rate: ~99%
- Retry Rate: ~1% (mostly temporary socket unavailability)
- Typical Volume: 100-500 messages per day
๐ Integration Pointsโ
Triggers This Jobโ
- Cron Schedule: Every 5 seconds automatically
- Customer Message: When customer replies via SMS/email to support conversation
- Manual Trigger: Via API endpoint (if QM_HOOKS=true)
Data Dependenciesโ
- SupportMessage collection: Must have messages with
user_notified=false - SupportRoom collection: Room details must exist
- Conversation Socket Service: For real-time agent notifications
Related Jobsโ
- Support Message Status: Tracks delivery status of agent-sent messages
- Support Communication Check: Health monitoring for support channels
Downstream Effectsโ
- Agent Dashboard: Real-time notification sound and visual indicator
- Desktop Notifications: Browser push notifications
- Mobile App: Push notifications to agent mobile devices
- Badge Counts: Unread message counts updated
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Database Updates: Sets
user_notified=trueon messages - โ ๏ธ Socket Events: Emits real-time notifications to connected agents
- โ ๏ธ Desktop Notifications: Triggers browser notification API
- โ ๏ธ Sound Effects: Plays notification sound in agent browser
Performance Considerationsโ
- 5-Second Intervals: Balance between instant notifications and system load
- Transaction Overhead: MongoDB transactions add ~50-100ms latency
- Socket Latency: HTTP POST to socket service adds ~50-100ms
- Agent Availability: Socket only notifies currently connected agents
Maintenance Notesโ
- Notification Timing: Agents see notifications within 5-10 seconds of message arrival
- JWT Expiration: 2-minute tokens are sufficient for immediate socket emit
- Stuck Processing Flags: Monitor for messages with
user_notification_in_progress=truefor extended periods - Socket Connection: Requires Conversation Socket service to be running
- Browser Permissions: Agents must grant notification permission in browser
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/conversations/support-notification
Create Test Messageโ
// Simulate customer message arriving
const message = await SupportMessage.create({
sender_type: 'support', // From customer
sender_id: testConversationId,
room_id: testRoomId,
source: 'sms',
content: 'Customer reply message',
from: '+1234567890',
user_notified: false, // Will be picked up by job
});
// Wait 5 seconds for notification to be sent
setTimeout(async () => {
const updated = await SupportMessage.findById(message._id);
console.log('Agent notified:', updated.user_notified); // true
}, 6000);
Monitor Notification Queueโ
// Count messages pending notification
const pending = await SupportMessage.countDocuments({
sender_type: 'support',
source: { $in: ['sms', 'email'] },
user_notified: false,
user_notification_in_progress: false,
});
console.log('Messages pending notification:', pending);
// Count messages currently processing
const inProgress = await SupportMessage.countDocuments({
user_notification_in_progress: true,
});
console.log('Messages currently notifying:', inProgress);
Test Socket Emissionโ
// Monitor socket events in Conversation Socket logs
// Look for 'newMessage' events
// Example event payload:
{
event: 'newMessage',
data: {
_id: '...',
sender_type: 'support',
source: 'sms',
content: 'Customer message',
user_notified: true,
// ... other message fields
}
}
Test Agent Notificationโ
<!-- In agent dashboard, monitor browser notifications -->
<script>
// Check if browser notifications work
if ('Notification' in window && Notification.permission === 'granted') {
console.log('Notifications enabled');
}
// Listen for socket events
socket.on('newMessage', message => {
console.log('New message notification received:', message);
// Triggers: desktop notification + sound + UI update
});
</script>
Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 300-500ms per message
Status: Active