Skip to main content

๐Ÿ”” Support Notification Processing

๐Ÿ“– Overviewโ€‹

The Support Notification job sends real-time notifications to support agents when new messages arrive from customers via SMS or email. It runs every 5 seconds to detect unnotified messages and emits socket events to trigger desktop/mobile notifications for support team members.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/conversations/support.notification.js
  2. Service Processing: queue-manager/services/conversations/support.notification.js
  3. Queue Definition: queue-manager/queues/conversations/support.notification.js

Execution Pattern: Cron-based (every 5 seconds)

Queue Name: support_message_status_queue (shared with message status)

Environment Flag: QM_CONVERSATIONS=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Notification Service
participant DB as SupportMessage<br/>Collection
participant QUEUE as Bull Queue
participant ROOM as SupportRoom<br/>Collection
participant SOCKET as Conversation Socket
participant AGENT as Support Agent<br/>Dashboard

CRON->>SERVICE: Check for new messages
SERVICE->>DB: Query messages<br/>(sender_type='support',<br/>source=sms/email,<br/>user_notified=false)
DB-->>SERVICE: Return unnotified messages

SERVICE->>DB: Mark user_notification_in_progress=true

loop For each message
SERVICE->>QUEUE: Add notification job
QUEUE->>DB: Update user_notified=true
QUEUE->>ROOM: Fetch room details<br/>(for account_id)
ROOM-->>QUEUE: Return room data

QUEUE->>SOCKET: Emit 'newMessage' event
SOCKET-->>AGENT: Trigger desktop notification
SOCKET-->>AGENT: Play notification sound
SOCKET-->>AGENT: Update UI with new message

QUEUE->>DB: Reset user_notification_in_progress=false
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/conversations/support.notification.js

Purpose: Schedule notification checks every 5 seconds

Cron Pattern: */5 * * * * * (every 5 seconds)

Initialization:

const supportNotification = require('../../services/conversations/support.notification');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await supportNotification();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/conversations/support-notification', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/conversations/support.notification.js

Purpose: Query unnotified support messages and add to notification queue

Matching Criteria:

  • sender_type: 'support' - Messages received from customers (not sent by agents)
  • source: ['sms', 'email'] - SMS or email messages only
  • user_notified: false - Not yet notified to agents
  • user_notification_in_progress: { $ne: true } - Not already processing

Processing Logic:

const { SupportMessage } = require('../../models');
const Queue = require('../../queues/conversations/support.notification');

module.exports = async () => {
try {
// 1. Query unnotified support messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'support', // Customer messages
source: {
$in: ['sms', 'email'], // SMS and email only
},
user_notified: false, // Not yet notified
user_notification_in_progress: {
$ne: true, // Not already processing
},
},
},
{
$sort: {
created: -1, // Most recent first
},
},
]);

if (messages.length) {
// 2. Collect message IDs
const ids = messages.map(message => message._id);

// 3. Mark as processing to prevent duplicate notifications
await SupportMessage.updateMany(
{ _id: { $in: ids } },
{ user_notification_in_progress: true },
);

// 4. Add each message to notification queue
await Promise.all(
messages.map(async message => {
try {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
},
);
} catch (err) {
// Reset processing flag on error
await SupportMessage.updateOne(
{ _id: message._id },
{ user_notification_in_progress: false },
);
console.log(
`Error occured while sending notification for messages`,
err.message,
err.stack,
);
}
}),
);

console.log('Processed messages notifications.');
}
} catch (err) {
console.log(`Error occured while sending notification for messages`, err.message, err.stack);
}
};

3. Queue Definitionโ€‹

File: queue-manager/queues/conversations/support.notification.js

Purpose: Mark message as notified and emit socket event for agent notification

Queue Options:

{
attempts: 10, // 10 retry attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup
}

Job Processor:

queue.process(async (job, done) => {
try {
const { id } = job.data;

// Update message and emit socket event in transaction
await withTransaction(async session => {
// 1. Mark message as notified
const message = await SupportMessage.findByIdAndUpdate(
id,
{ user_notified: true },
{ new: true, session },
);

// 2. Fetch room details for account_id
const room = await SupportRoom.findById(message.room_id);

// 3. Generate JWT token for socket authentication
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: room.account_id.toString(),
type: 'support',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);

// 4. Emit socket event to notify agents
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'newMessage', // Triggers agent notification
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': room.account_id?.toString(),
},
},
);
});

return done();
} catch (err) {
done(err);
}
});

Event Handlers:

// Error handler
queue.on('error', err => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error in support notification queue',
});
});

// Failed job handler
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
job_id: job.id,
job_data: job.data,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
}
});

// Completed job handler
queue.on('completed', async job => {
try {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error while finalizing support notification queue',
});
}
});

๐Ÿ—„๏ธ Collections Usedโ€‹

support_messagesโ€‹

  • Operations: Read, Update
  • Model: shared/models/support-message.js
  • Usage Context:
    • Query messages without user notification
    • Update user_notified field to true
    • Manage user_notification_in_progress flag
    • Get message details for notification

Key Fields:

  • sender_type: 'support' (messages from customers)
  • source: 'sms' | 'email' (communication channel)
  • user_notified: Boolean indicating agents notified
  • user_notification_in_progress: Processing lock flag
  • room_id: Reference to SupportRoom
  • sender_id: Conversation ID for socket routing

support_roomsโ€‹

  • Operations: Read
  • Model: shared/models/support-room.js
  • Usage Context:
    • Lookup room by message.room_id
    • Get account_id for socket authentication
    • Identify assigned support agents

Key Fields:

  • account_id: Owner account for authentication
  • assigned_to: Support agent(s) to notify
  • status: Room status (open/snoozed/closed)

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 10, // Maximum retry attempts
backoff: {
type: "exponential", // Exponential backoff strategy
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup on success
}

Cron Scheduleโ€‹

'*/5 * * * * *'; // Every 5 seconds

Frequency Rationale: 5-second intervals provide near-instant agent notifications while minimizing database load.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Service Layer Processingโ€‹

Service Function: module.exports (anonymous async function)

Purpose: Query unnotified support messages and add to notification queue

Processing Steps:

  1. Query Unnotified Support Messages

    const messages = await SupportMessage.aggregate([
    {
    $match: {
    sender_type: 'support', // Customer messages
    source: { $in: ['sms', 'email'] },
    user_notified: false,
    user_notification_in_progress: { $ne: true },
    },
    },
    { $sort: { created: -1 } },
    ]);

    Matching Criteria:

    • Only customer-sent messages (sender_type='support')
    • SMS and email channels only
    • Not yet notified to agents
    • Not already being processed
  2. Mark as Processing

    const ids = messages.map(message => message._id);
    await SupportMessage.updateMany({ _id: { $in: ids } }, { user_notification_in_progress: true });

    Purpose: Prevent duplicate notifications to agents

  3. Add Notification Jobs

    await Promise.all(
    messages.map(async message => {
    const queue = await Queue.start();
    await queue.add(
    { id: message._id },
    {
    attempts: 10,
    backoff: { type: 'exponential', delay: 1000 },
    },
    );
    }),
    );
  4. Error Recovery

    catch (err) {
    // Reset flag to allow retry on next cron run
    await SupportMessage.updateOne(
    { _id: message._id },
    { user_notification_in_progress: false }
    );
    }

Queue Processingโ€‹

Queue Processor: Inside queues/conversations/support.notification.js

Purpose: Mark message as notified and emit socket event for agent notification

Job Data Structure:

{
id: ObjectId; // SupportMessage document ID
}

Processing Steps:

  1. Mark Message as Notified

    const message = await SupportMessage.findByIdAndUpdate(
    id,
    { user_notified: true },
    { new: true, session },
    );

    Purpose: Prevent duplicate notifications

  2. Fetch Room Details

    const room = await SupportRoom.findById(message.room_id);

    Needed: room.account_id for socket authentication and routing

  3. Generate JWT Token

    const jwt_token = jwt.sign(
    {
    conversation_id: message.sender_id.toString(),
    account_id: room.account_id.toString(),
    type: 'support',
    },
    process.env.APP_SECRET,
    { expiresIn: '2m' },
    );

    Purpose: Authenticate with Conversation Socket service

  4. Emit Socket Event

    await axios.post(
    `${process.env.CONVERSATION_SOCKET}/emit`,
    {
    event: 'newMessage', // Triggers notification
    data: message,
    },
    {
    headers: {
    Authorization: `Bearer ${jwt_token}`,
    'x-account-id': room.account_id?.toString(),
    },
    },
    );

    Agent Notification: Triggers:

    • Desktop notification
    • Browser notification sound
    • Message indicator in UI
    • Badge count update
  5. Cleanup Processing Flag

    queue.on('completed', async job => {
    await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
    });

Error Handling in Flowโ€‹

Service Layer Errors:

try {
const queue = await Queue.start();
await queue.add({ id: message._id }, { attempts: 10, backoff: {...} });
} catch (err) {
// Reset flag to allow retry
await SupportMessage.updateOne(
{ _id: message._id },
{ user_notification_in_progress: false }
);
console.log(`Error occured while sending notification for messages`, err.message);
}

Queue Processor Errors:

// Bull automatically retries based on attempts config (10 attempts)
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
job_id: job.id,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { user_notification_in_progress: false });
}
});

Retry Strategy: 10 attempts with exponential backoff starting at 1 second

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Room Not Foundโ€‹

const room = await SupportRoom.findById(message.room_id);

if (!room) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: 'Room not found',
message_id: message._id,
room_id: message.room_id,
});
throw new Error('Room not found'); // Trigger retry
}

Reason: Race condition - room may have been deleted

Socket Emit Failureโ€‹

try {
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-notification',
error: 'Socket emit failed',
details: error.message
});
throw error; // Trigger retry - notification must be sent
}

Reason: Conversation Socket service may be temporarily unavailable

Critical: Notifications MUST be sent to agents, so failures trigger retries

Transaction Failureโ€‹

await withTransaction(async session => {
// Update message
const message = await SupportMessage.findByIdAndUpdate(id, { user_notified: true }, { session });
// Emit socket event
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
});
// If any step fails, entire transaction rolls back

Protection: Ensures message notification flag and socket emission stay synchronized

Retry Strategyโ€‹

{
attempts: 10, // Maximum 10 attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
}
}

Backoff Schedule:

  • Attempt 1: Immediate
  • Attempt 2: 1 second
  • Attempt 3: 2 seconds
  • Attempt 4: 4 seconds
  • Attempt 5-10: Progressive exponential backoff (8s, 16s, 32s, 64s, 128s, 256s)

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

console.log('Processed messages notifications.');
console.log(`Notified agents of ${messages.length} new messages`);

Error Loggingโ€‹

logger.error({
initiator: 'QM/conversation/support-notification',
error: err,
message: 'Error in support notification queue',
});

Performance Metricsโ€‹

  • Average Processing Time: ~300-500ms per message (includes socket emit)
  • Success Rate: ~99%
  • Retry Rate: ~1% (mostly temporary socket unavailability)
  • Typical Volume: 100-500 messages per day

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 5 seconds automatically
  • Customer Message: When customer replies via SMS/email to support conversation
  • Manual Trigger: Via API endpoint (if QM_HOOKS=true)

Data Dependenciesโ€‹

  • SupportMessage collection: Must have messages with user_notified=false
  • SupportRoom collection: Room details must exist
  • Conversation Socket Service: For real-time agent notifications
  • Support Message Status: Tracks delivery status of agent-sent messages
  • Support Communication Check: Health monitoring for support channels

Downstream Effectsโ€‹

  • Agent Dashboard: Real-time notification sound and visual indicator
  • Desktop Notifications: Browser push notifications
  • Mobile App: Push notifications to agent mobile devices
  • Badge Counts: Unread message counts updated

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Database Updates: Sets user_notified=true on messages
  • โš ๏ธ Socket Events: Emits real-time notifications to connected agents
  • โš ๏ธ Desktop Notifications: Triggers browser notification API
  • โš ๏ธ Sound Effects: Plays notification sound in agent browser

Performance Considerationsโ€‹

  • 5-Second Intervals: Balance between instant notifications and system load
  • Transaction Overhead: MongoDB transactions add ~50-100ms latency
  • Socket Latency: HTTP POST to socket service adds ~50-100ms
  • Agent Availability: Socket only notifies currently connected agents

Maintenance Notesโ€‹

  • Notification Timing: Agents see notifications within 5-10 seconds of message arrival
  • JWT Expiration: 2-minute tokens are sufficient for immediate socket emit
  • Stuck Processing Flags: Monitor for messages with user_notification_in_progress=true for extended periods
  • Socket Connection: Requires Conversation Socket service to be running
  • Browser Permissions: Agents must grant notification permission in browser

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/conversations/support-notification

Create Test Messageโ€‹

// Simulate customer message arriving
const message = await SupportMessage.create({
sender_type: 'support', // From customer
sender_id: testConversationId,
room_id: testRoomId,
source: 'sms',
content: 'Customer reply message',
from: '+1234567890',
user_notified: false, // Will be picked up by job
});

// Wait 5 seconds for notification to be sent
setTimeout(async () => {
const updated = await SupportMessage.findById(message._id);
console.log('Agent notified:', updated.user_notified); // true
}, 6000);

Monitor Notification Queueโ€‹

// Count messages pending notification
const pending = await SupportMessage.countDocuments({
sender_type: 'support',
source: { $in: ['sms', 'email'] },
user_notified: false,
user_notification_in_progress: false,
});

console.log('Messages pending notification:', pending);

// Count messages currently processing
const inProgress = await SupportMessage.countDocuments({
user_notification_in_progress: true,
});

console.log('Messages currently notifying:', inProgress);

Test Socket Emissionโ€‹

// Monitor socket events in Conversation Socket logs
// Look for 'newMessage' events

// Example event payload:
{
event: 'newMessage',
data: {
_id: '...',
sender_type: 'support',
source: 'sms',
content: 'Customer message',
user_notified: true,
// ... other message fields
}
}

Test Agent Notificationโ€‹

<!-- In agent dashboard, monitor browser notifications -->
<script>
// Check if browser notifications work
if ('Notification' in window && Notification.permission === 'granted') {
console.log('Notifications enabled');
}

// Listen for socket events
socket.on('newMessage', message => {
console.log('New message notification received:', message);
// Triggers: desktop notification + sound + UI update
});
</script>

Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 300-500ms per message
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM