๐ฌ Support Message Status Processing
๐ Overviewโ
The Support Message Status job tracks delivery status for SMS and email messages sent through support conversations. It runs every 5 seconds to check the communication logs and update message delivery status in real-time, then emits socket events to update the dashboard UI.
Complete Flow:
- Cron Initialization:
queue-manager/crons/conversations/support.message-status.js - Service Processing:
queue-manager/services/conversations/support.message-status.js - Queue Definition:
queue-manager/queues/conversations/support.message-status.js
Execution Pattern: Cron-based (every 5 seconds)
Queue Name: support_message_status_queue
Environment Flag: QM_CONVERSATIONS=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Message Status Service
participant DB as SupportMessage<br/>Collection
participant QUEUE as Bull Queue
participant COMM as Communication<br/>Collection
participant SOCKET as Conversation Socket
participant DASH as Dashboard UI
CRON->>SERVICE: Check message status
SERVICE->>DB: Query messages<br/>(sender_type='conversation',<br/>source=sms/email,<br/>delivered not exists)
DB-->>SERVICE: Return untracked messages
SERVICE->>DB: Mark status_queue_in_progress=true
loop For each message
SERVICE->>QUEUE: Add status check job
QUEUE->>COMM: Find communication log<br/>(conv_message_id)
COMM-->>QUEUE: Return delivery status
alt Communication found & successful
QUEUE->>DB: Update delivered=true<br/>+ communication_id
else Communication failed
QUEUE->>DB: Update delivered=false
end
QUEUE->>SOCKET: Emit 'updateMessage' event
SOCKET-->>DASH: Update message in UI
QUEUE->>DB: Reset status_queue_in_progress=false
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/conversations/support.message-status.js
Purpose: Schedule message status checks every 5 seconds
Cron Pattern: */5 * * * * * (every 5 seconds)
Initialization:
const supportMessageStatus = require('../../services/conversations/support.message-status');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await supportMessageStatus();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/conversation/support-msg-status', error: err });
}
};
In-Progress Lock: Prevents overlapping executions.
2. Service Processing (THE CORE LOGIC)โ
File: queue-manager/services/conversations/support.message-status.js
Purpose: Query untracked messages and add to status check queue
Matching Criteria:
sender_type: 'conversation'- Messages sent from conversation systemsource: ['sms', 'email']- SMS or email messages onlydelivered: { $exists: false }- Not yet trackedstatus_queue_in_progress: { $ne: true }- Not already processing
Processing Logic:
const { SupportMessage } = require('../../models');
const Queue = require('../../queues/conversations/support.message-status');
module.exports = async () => {
try {
// 1. Query untracked messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'conversation',
source: {
$in: ['sms', 'email'], // SMS and email only
},
delivered: {
$exists: false, // Not yet tracked
},
status_queue_in_progress: {
$ne: true, // Not already processing
},
},
},
{
$sort: {
created: -1, // Most recent first
},
},
]);
if (messages.length) {
// 2. Collect message IDs
const ids = messages.map(message => message._id);
// 3. Mark as processing to prevent duplicate jobs
await SupportMessage.updateMany({ _id: { $in: ids } }, { status_queue_in_progress: true });
// 4. Add each message to status check queue
await Promise.all(
messages.map(async message => {
try {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
},
);
} catch (err) {
// Reset processing flag on error
await SupportMessage.updateOne(
{ _id: message._id },
{ status_queue_in_progress: false },
);
console.log(
`Error occured while updating delivery status for messages`,
err.message,
err.stack,
);
}
}),
);
console.log('Processed messages status.');
}
} catch (err) {
console.log(
`Error occured while updating delivery status for messages`,
err.message,
err.stack,
);
}
};
3. Queue Definitionโ
File: queue-manager/queues/conversations/support.message-status.js
Purpose: Check communication logs and update message delivery status
Queue Options:
{
attempts: 10, // 10 retry attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup
}
Job Processor:
queue.process(async (job, done) => {
try {
const { id } = job.data;
// 1. Find communication log for this message
const communication = await Communication.findOne({
conv_message_id: new mongoose.Types.ObjectId(id),
});
if (communication) {
let data = {};
// 2. Determine delivery status from communication log
if (communication._doc.success) {
data = {
metadata: { communication_id: communication._id },
delivered: true,
};
} else {
data = { delivered: false };
}
// 3. Update message and emit socket event in transaction
await withTransaction(async session => {
const message = await SupportMessage.findByIdAndUpdate(id, data, { new: true, session });
// 4. Generate JWT token for socket authentication
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: communication.account_id.toString(),
type: 'team',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);
// 5. Emit socket event to update UI
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'updateMessage',
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': communication.account_id?.toString(),
},
},
);
});
}
return done();
} catch (err) {
done(err);
}
});
Event Handlers:
// Error handler
queue.on('error', err => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: `Error in support message status queue`,
});
});
// Failed job handler
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
jobId: job.id,
});
// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
}
});
// Completed job handler
queue.on('completed', async job => {
try {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: 'Error while finalizing support message status queue',
});
}
});
๐๏ธ Collections Usedโ
support_messagesโ
- Operations: Read, Update
- Model:
shared/models/support-message.js - Usage Context:
- Query messages without delivery status
- Update
deliveredfield (true/false) - Store
communication_idin metadata - Manage
status_queue_in_progressflag
Key Fields:
sender_type: 'conversation' (messages from conversation system)source: 'sms' | 'email' (communication channel)delivered: Boolean indicating delivery successstatus_queue_in_progress: Processing lock flagmetadata.communication_id: Reference to communication logsender_id: Conversation ID for socket routing
communicationsโ
- Operations: Read
- Model:
shared/models/communication.js - Usage Context:
- Lookup delivery status by
conv_message_id - Check
successfield for delivery confirmation - Get
account_idfor socket authentication
- Lookup delivery status by
Key Fields:
conv_message_id: Reference to SupportMessage IDsuccess: Boolean indicating delivery successaccount_id: Owner account for authenticationprovider: SMS/email provider (Twilio, SendGrid, etc.)
๐ง Job Configurationโ
Queue Optionsโ
{
attempts: 10, // Maximum retry attempts
backoff: {
type: "exponential", // Exponential backoff strategy
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup on success
}
Cron Scheduleโ
'*/5 * * * * *'; // Every 5 seconds
Frequency Rationale: 5-second intervals provide near-real-time status updates while minimizing database load.
๐ Processing Logic - Detailed Flowโ
Service Layer Processingโ
Service Function: module.exports (anonymous async function)
Purpose: Query untracked messages and add to status check queue
Processing Steps:
-
Query Untracked Messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'conversation',
source: { $in: ['sms', 'email'] },
delivered: { $exists: false },
status_queue_in_progress: { $ne: true },
},
},
{ $sort: { created: -1 } },
]);Matching Criteria:
- Only conversation-sent messages
- SMS and email channels only
- No delivery status yet
- Not already being processed
-
Mark as Processing
const ids = messages.map(message => message._id);
await SupportMessage.updateMany({ _id: { $in: ids } }, { status_queue_in_progress: true });Purpose: Prevent duplicate job creation
-
Add Status Check Jobs
await Promise.all(
messages.map(async message => {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: { type: 'exponential', delay: 1000 },
},
);
}),
); -
Error Recovery
catch (err) {
// Reset flag to allow retry on next cron run
await SupportMessage.updateOne(
{ _id: message._id },
{ status_queue_in_progress: false }
);
}
Queue Processingโ
Queue Processor: Inside queues/conversations/support.message-status.js
Purpose: Check communication logs and update delivery status
Job Data Structure:
{
id: ObjectId; // SupportMessage document ID
}
Processing Steps:
-
Find Communication Log
const communication = await Communication.findOne({
conv_message_id: new mongoose.Types.ObjectId(id),
});Lookup: Communication logs store the actual delivery status from providers (Twilio, SendGrid, etc.)
-
Determine Delivery Status
let data = {};
if (communication._doc.success) {
data = {
metadata: { communication_id: communication._id },
delivered: true,
};
} else {
data = { delivered: false };
}Logic:
success: trueโ Message delivered successfullysuccess: falseโ Delivery failed- No communication log โ Wait for provider callback
-
Update Message in Transaction
await withTransaction(async session => {
const message = await SupportMessage.findByIdAndUpdate(id, data, { new: true, session });
// Return updated message for socket emission
});Transaction: Ensures consistency between message update and socket emission
-
Generate JWT Token
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: communication.account_id.toString(),
type: 'team',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);Purpose: Authenticate with Conversation Socket service
-
Emit Socket Event
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'updateMessage',
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': communication.account_id?.toString(),
},
},
);Real-time Update: Dashboard UI receives message status update immediately
-
Cleanup Processing Flag
queue.on('completed', async job => {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
});
Error Handling in Flowโ
Service Layer Errors:
try {
const queue = await Queue.start();
await queue.add({ id: message._id }, { attempts: 10, backoff: {...} });
} catch (err) {
// Reset flag to allow retry
await SupportMessage.updateOne(
{ _id: message._id },
{ status_queue_in_progress: false }
);
console.log(`Error occured while updating delivery status for messages`, err.message);
}
Queue Processor Errors:
// Bull automatically retries based on attempts config (10 attempts)
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
jobId: job.id,
});
// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
}
});
Retry Strategy: 10 attempts with exponential backoff starting at 1 second
๐จ Error Handlingโ
Common Error Scenariosโ
Communication Log Not Foundโ
const communication = await Communication.findOne({
conv_message_id: new mongoose.Types.ObjectId(id),
});
if (!communication) {
// Communication log not yet created - provider callback pending
// Job will retry and check again
return done(); // Skip for now, will retry
}
Reason: Provider callbacks (Twilio, SendGrid) may arrive after message is sent
Socket Emit Failureโ
try {
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: 'Socket emit failed',
details: error.message
});
throw error; // Trigger retry - socket must be updated
}
Reason: Conversation Socket service may be temporarily unavailable
Transaction Failureโ
await withTransaction(async session => {
// Update message
const message = await SupportMessage.findByIdAndUpdate(id, data, { session });
// Emit socket event
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
});
// If any step fails, entire transaction rolls back
Protection: Ensures message status and UI stay synchronized
Database Connection Errorโ
try {
await SupportMessage.updateOne({ _id: message._id }, { ... });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: 'Database update failed',
details: error.message
});
throw error; // Trigger retry
}
Retry Strategyโ
{
attempts: 10, // Maximum 10 attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
}
}
Backoff Schedule:
- Attempt 1: Immediate
- Attempt 2: 1 second
- Attempt 3: 2 seconds
- Attempt 4: 4 seconds
- Attempt 5-10: Progressive exponential backoff (8s, 16s, 32s, 64s, 128s, 256s)
๐ Monitoring & Loggingโ
Success Loggingโ
console.log('Processed messages status.');
console.log(`Updated ${messages.length} message statuses`);
Error Loggingโ
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: `Error in support message status queue`,
});
Performance Metricsโ
- Average Processing Time: ~500-1000ms per message (includes socket emit)
- Success Rate: ~98%
- Retry Rate: ~2% (mostly waiting for communication logs)
- Typical Volume: 50-200 messages per day
๐ Integration Pointsโ
Triggers This Jobโ
- Cron Schedule: Every 5 seconds automatically
- Message Sent: When support agent sends SMS/email through conversation
- Manual Trigger: Via API endpoint (if QM_HOOKS=true)
Data Dependenciesโ
- SupportMessage collection: Must have messages with
deliverednot set - Communication collection: Provider delivery logs must exist
- Conversation Socket Service: For real-time UI updates
Related Jobsโ
- Support Notification: May trigger agent notifications based on delivery status
- Communication Providers: Twilio, SendGrid callbacks create communication logs
Downstream Effectsโ
- Dashboard UI: Real-time message status indicators (โโ for delivered)
- Analytics: Delivery success rates tracked
- Alerts: Failed deliveries may trigger notifications
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Database Updates: Sets
deliveredfield on messages - โ ๏ธ Socket Events: Emits real-time updates to connected dashboards
- โ ๏ธ JWT Generation: Creates short-lived tokens for each message
- โ ๏ธ HTTP Requests: Calls Conversation Socket service
Performance Considerationsโ
- 5-Second Intervals: Balance between real-time updates and system load
- Transaction Overhead: MongoDB transactions add ~100ms latency
- Socket Latency: HTTP POST to socket service adds ~50-100ms
- Exponential Backoff: Handles temporary communication log delays
Maintenance Notesโ
- Communication Log Timing: Logs may arrive 1-5 seconds after message sent
- JWT Expiration: 2-minute tokens are sufficient for immediate socket emit
- Stuck Processing Flags: Monitor for messages with
status_queue_in_progress=truefor extended periods - Socket Connection: Requires Conversation Socket service to be running
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/conversations/support-message-status
Create Test Messageโ
// Send test SMS message
const message = await SupportMessage.create({
sender_type: 'conversation',
sender_id: testConversationId,
source: 'sms',
content: 'Test message',
to: '+1234567890',
// delivered field not set - will be picked up by job
});
// Create communication log (simulates Twilio callback)
await Communication.create({
conv_message_id: message._id,
account_id: testAccountId,
success: true,
provider: 'twilio',
response: {
/* provider response */
},
});
// Wait 5 seconds for job to process
setTimeout(async () => {
const updated = await SupportMessage.findById(message._id);
console.log('Delivery status updated:', updated.delivered); // true
console.log('Communication ID stored:', updated.metadata.communication_id);
}, 6000);
Monitor Status Queueโ
// Count messages pending status check
const pending = await SupportMessage.countDocuments({
sender_type: 'conversation',
source: { $in: ['sms', 'email'] },
delivered: { $exists: false },
status_queue_in_progress: false,
});
console.log('Messages pending status check:', pending);
// Count messages currently processing
const inProgress = await SupportMessage.countDocuments({
status_queue_in_progress: true,
});
console.log('Messages currently processing:', inProgress);
Test Socket Emissionโ
// Monitor socket events in Conversation Socket logs
// Look for 'updateMessage' events with delivered status
// Example event payload:
{
event: 'updateMessage',
data: {
_id: '...',
delivered: true,
metadata: { communication_id: '...' },
// ... other message fields
}
}
Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 500-1000ms per message
Status: Active