Skip to main content

๐Ÿ“ฌ Support Message Status Processing

๐Ÿ“– Overviewโ€‹

The Support Message Status job tracks delivery status for SMS and email messages sent through support conversations. It runs every 5 seconds to check the communication logs and update message delivery status in real-time, then emits socket events to update the dashboard UI.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/conversations/support.message-status.js
  2. Service Processing: queue-manager/services/conversations/support.message-status.js
  3. Queue Definition: queue-manager/queues/conversations/support.message-status.js

Execution Pattern: Cron-based (every 5 seconds)

Queue Name: support_message_status_queue

Environment Flag: QM_CONVERSATIONS=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5s)
participant SERVICE as Message Status Service
participant DB as SupportMessage<br/>Collection
participant QUEUE as Bull Queue
participant COMM as Communication<br/>Collection
participant SOCKET as Conversation Socket
participant DASH as Dashboard UI

CRON->>SERVICE: Check message status
SERVICE->>DB: Query messages<br/>(sender_type='conversation',<br/>source=sms/email,<br/>delivered not exists)
DB-->>SERVICE: Return untracked messages

SERVICE->>DB: Mark status_queue_in_progress=true

loop For each message
SERVICE->>QUEUE: Add status check job
QUEUE->>COMM: Find communication log<br/>(conv_message_id)
COMM-->>QUEUE: Return delivery status

alt Communication found & successful
QUEUE->>DB: Update delivered=true<br/>+ communication_id
else Communication failed
QUEUE->>DB: Update delivered=false
end

QUEUE->>SOCKET: Emit 'updateMessage' event
SOCKET-->>DASH: Update message in UI
QUEUE->>DB: Reset status_queue_in_progress=false
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/conversations/support.message-status.js

Purpose: Schedule message status checks every 5 seconds

Cron Pattern: */5 * * * * * (every 5 seconds)

Initialization:

const supportMessageStatus = require('../../services/conversations/support.message-status');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await supportMessageStatus();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/conversation/support-msg-status', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/conversations/support.message-status.js

Purpose: Query untracked messages and add to status check queue

Matching Criteria:

  • sender_type: 'conversation' - Messages sent from conversation system
  • source: ['sms', 'email'] - SMS or email messages only
  • delivered: { $exists: false } - Not yet tracked
  • status_queue_in_progress: { $ne: true } - Not already processing

Processing Logic:

const { SupportMessage } = require('../../models');
const Queue = require('../../queues/conversations/support.message-status');

module.exports = async () => {
try {
// 1. Query untracked messages
const messages = await SupportMessage.aggregate([
{
$match: {
sender_type: 'conversation',
source: {
$in: ['sms', 'email'], // SMS and email only
},
delivered: {
$exists: false, // Not yet tracked
},
status_queue_in_progress: {
$ne: true, // Not already processing
},
},
},
{
$sort: {
created: -1, // Most recent first
},
},
]);

if (messages.length) {
// 2. Collect message IDs
const ids = messages.map(message => message._id);

// 3. Mark as processing to prevent duplicate jobs
await SupportMessage.updateMany({ _id: { $in: ids } }, { status_queue_in_progress: true });

// 4. Add each message to status check queue
await Promise.all(
messages.map(async message => {
try {
const queue = await Queue.start();
await queue.add(
{ id: message._id },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
},
);
} catch (err) {
// Reset processing flag on error
await SupportMessage.updateOne(
{ _id: message._id },
{ status_queue_in_progress: false },
);
console.log(
`Error occured while updating delivery status for messages`,
err.message,
err.stack,
);
}
}),
);

console.log('Processed messages status.');
}
} catch (err) {
console.log(
`Error occured while updating delivery status for messages`,
err.message,
err.stack,
);
}
};

3. Queue Definitionโ€‹

File: queue-manager/queues/conversations/support.message-status.js

Purpose: Check communication logs and update message delivery status

Queue Options:

{
attempts: 10, // 10 retry attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup
}

Job Processor:

queue.process(async (job, done) => {
try {
const { id } = job.data;

// 1. Find communication log for this message
const communication = await Communication.findOne({
conv_message_id: new mongoose.Types.ObjectId(id),
});

if (communication) {
let data = {};

// 2. Determine delivery status from communication log
if (communication._doc.success) {
data = {
metadata: { communication_id: communication._id },
delivered: true,
};
} else {
data = { delivered: false };
}

// 3. Update message and emit socket event in transaction
await withTransaction(async session => {
const message = await SupportMessage.findByIdAndUpdate(id, data, { new: true, session });

// 4. Generate JWT token for socket authentication
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: communication.account_id.toString(),
type: 'team',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);

// 5. Emit socket event to update UI
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'updateMessage',
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': communication.account_id?.toString(),
},
},
);
});
}

return done();
} catch (err) {
done(err);
}
});

Event Handlers:

// Error handler
queue.on('error', err => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: `Error in support message status queue`,
});
});

// Failed job handler
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
jobId: job.id,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
}
});

// Completed job handler
queue.on('completed', async job => {
try {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: 'Error while finalizing support message status queue',
});
}
});

๐Ÿ—„๏ธ Collections Usedโ€‹

support_messagesโ€‹

  • Operations: Read, Update
  • Model: shared/models/support-message.js
  • Usage Context:
    • Query messages without delivery status
    • Update delivered field (true/false)
    • Store communication_id in metadata
    • Manage status_queue_in_progress flag

Key Fields:

  • sender_type: 'conversation' (messages from conversation system)
  • source: 'sms' | 'email' (communication channel)
  • delivered: Boolean indicating delivery success
  • status_queue_in_progress: Processing lock flag
  • metadata.communication_id: Reference to communication log
  • sender_id: Conversation ID for socket routing

communicationsโ€‹

  • Operations: Read
  • Model: shared/models/communication.js
  • Usage Context:
    • Lookup delivery status by conv_message_id
    • Check success field for delivery confirmation
    • Get account_id for socket authentication

Key Fields:

  • conv_message_id: Reference to SupportMessage ID
  • success: Boolean indicating delivery success
  • account_id: Owner account for authentication
  • provider: SMS/email provider (Twilio, SendGrid, etc.)

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 10, // Maximum retry attempts
backoff: {
type: "exponential", // Exponential backoff strategy
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup on success
}

Cron Scheduleโ€‹

'*/5 * * * * *'; // Every 5 seconds

Frequency Rationale: 5-second intervals provide near-real-time status updates while minimizing database load.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Service Layer Processingโ€‹

Service Function: module.exports (anonymous async function)

Purpose: Query untracked messages and add to status check queue

Processing Steps:

  1. Query Untracked Messages

    const messages = await SupportMessage.aggregate([
    {
    $match: {
    sender_type: 'conversation',
    source: { $in: ['sms', 'email'] },
    delivered: { $exists: false },
    status_queue_in_progress: { $ne: true },
    },
    },
    { $sort: { created: -1 } },
    ]);

    Matching Criteria:

    • Only conversation-sent messages
    • SMS and email channels only
    • No delivery status yet
    • Not already being processed
  2. Mark as Processing

    const ids = messages.map(message => message._id);
    await SupportMessage.updateMany({ _id: { $in: ids } }, { status_queue_in_progress: true });

    Purpose: Prevent duplicate job creation

  3. Add Status Check Jobs

    await Promise.all(
    messages.map(async message => {
    const queue = await Queue.start();
    await queue.add(
    { id: message._id },
    {
    attempts: 10,
    backoff: { type: 'exponential', delay: 1000 },
    },
    );
    }),
    );
  4. Error Recovery

    catch (err) {
    // Reset flag to allow retry on next cron run
    await SupportMessage.updateOne(
    { _id: message._id },
    { status_queue_in_progress: false }
    );
    }

Queue Processingโ€‹

Queue Processor: Inside queues/conversations/support.message-status.js

Purpose: Check communication logs and update delivery status

Job Data Structure:

{
id: ObjectId; // SupportMessage document ID
}

Processing Steps:

  1. Find Communication Log

    const communication = await Communication.findOne({
    conv_message_id: new mongoose.Types.ObjectId(id),
    });

    Lookup: Communication logs store the actual delivery status from providers (Twilio, SendGrid, etc.)

  2. Determine Delivery Status

    let data = {};
    if (communication._doc.success) {
    data = {
    metadata: { communication_id: communication._id },
    delivered: true,
    };
    } else {
    data = { delivered: false };
    }

    Logic:

    • success: true โ†’ Message delivered successfully
    • success: false โ†’ Delivery failed
    • No communication log โ†’ Wait for provider callback
  3. Update Message in Transaction

    await withTransaction(async session => {
    const message = await SupportMessage.findByIdAndUpdate(id, data, { new: true, session });

    // Return updated message for socket emission
    });

    Transaction: Ensures consistency between message update and socket emission

  4. Generate JWT Token

    const jwt_token = jwt.sign(
    {
    conversation_id: message.sender_id.toString(),
    account_id: communication.account_id.toString(),
    type: 'team',
    },
    process.env.APP_SECRET,
    { expiresIn: '2m' },
    );

    Purpose: Authenticate with Conversation Socket service

  5. Emit Socket Event

    await axios.post(
    `${process.env.CONVERSATION_SOCKET}/emit`,
    {
    event: 'updateMessage',
    data: message,
    },
    {
    headers: {
    Authorization: `Bearer ${jwt_token}`,
    'x-account-id': communication.account_id?.toString(),
    },
    },
    );

    Real-time Update: Dashboard UI receives message status update immediately

  6. Cleanup Processing Flag

    queue.on('completed', async job => {
    await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
    });

Error Handling in Flowโ€‹

Service Layer Errors:

try {
const queue = await Queue.start();
await queue.add({ id: message._id }, { attempts: 10, backoff: {...} });
} catch (err) {
// Reset flag to allow retry
await SupportMessage.updateOne(
{ _id: message._id },
{ status_queue_in_progress: false }
);
console.log(`Error occured while updating delivery status for messages`, err.message);
}

Queue Processor Errors:

// Bull automatically retries based on attempts config (10 attempts)
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
jobId: job.id,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await SupportMessage.updateOne({ _id: job.data.id }, { status_queue_in_progress: false });
}
});

Retry Strategy: 10 attempts with exponential backoff starting at 1 second

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Communication Log Not Foundโ€‹

const communication = await Communication.findOne({
conv_message_id: new mongoose.Types.ObjectId(id),
});

if (!communication) {
// Communication log not yet created - provider callback pending
// Job will retry and check again
return done(); // Skip for now, will retry
}

Reason: Provider callbacks (Twilio, SendGrid) may arrive after message is sent

Socket Emit Failureโ€‹

try {
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: 'Socket emit failed',
details: error.message
});
throw error; // Trigger retry - socket must be updated
}

Reason: Conversation Socket service may be temporarily unavailable

Transaction Failureโ€‹

await withTransaction(async session => {
// Update message
const message = await SupportMessage.findByIdAndUpdate(id, data, { session });
// Emit socket event
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
});
// If any step fails, entire transaction rolls back

Protection: Ensures message status and UI stay synchronized

Database Connection Errorโ€‹

try {
await SupportMessage.updateOne({ _id: message._id }, { ... });
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-message-status',
error: 'Database update failed',
details: error.message
});
throw error; // Trigger retry
}

Retry Strategyโ€‹

{
attempts: 10, // Maximum 10 attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
}
}

Backoff Schedule:

  • Attempt 1: Immediate
  • Attempt 2: 1 second
  • Attempt 3: 2 seconds
  • Attempt 4: 4 seconds
  • Attempt 5-10: Progressive exponential backoff (8s, 16s, 32s, 64s, 128s, 256s)

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

console.log('Processed messages status.');
console.log(`Updated ${messages.length} message statuses`);

Error Loggingโ€‹

logger.error({
initiator: 'QM/conversation/support-message-status',
error: err,
message: `Error in support message status queue`,
});

Performance Metricsโ€‹

  • Average Processing Time: ~500-1000ms per message (includes socket emit)
  • Success Rate: ~98%
  • Retry Rate: ~2% (mostly waiting for communication logs)
  • Typical Volume: 50-200 messages per day

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 5 seconds automatically
  • Message Sent: When support agent sends SMS/email through conversation
  • Manual Trigger: Via API endpoint (if QM_HOOKS=true)

Data Dependenciesโ€‹

  • SupportMessage collection: Must have messages with delivered not set
  • Communication collection: Provider delivery logs must exist
  • Conversation Socket Service: For real-time UI updates
  • Support Notification: May trigger agent notifications based on delivery status
  • Communication Providers: Twilio, SendGrid callbacks create communication logs

Downstream Effectsโ€‹

  • Dashboard UI: Real-time message status indicators (โœ“โœ“ for delivered)
  • Analytics: Delivery success rates tracked
  • Alerts: Failed deliveries may trigger notifications

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Database Updates: Sets delivered field on messages
  • โš ๏ธ Socket Events: Emits real-time updates to connected dashboards
  • โš ๏ธ JWT Generation: Creates short-lived tokens for each message
  • โš ๏ธ HTTP Requests: Calls Conversation Socket service

Performance Considerationsโ€‹

  • 5-Second Intervals: Balance between real-time updates and system load
  • Transaction Overhead: MongoDB transactions add ~100ms latency
  • Socket Latency: HTTP POST to socket service adds ~50-100ms
  • Exponential Backoff: Handles temporary communication log delays

Maintenance Notesโ€‹

  • Communication Log Timing: Logs may arrive 1-5 seconds after message sent
  • JWT Expiration: 2-minute tokens are sufficient for immediate socket emit
  • Stuck Processing Flags: Monitor for messages with status_queue_in_progress=true for extended periods
  • Socket Connection: Requires Conversation Socket service to be running

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/conversations/support-message-status

Create Test Messageโ€‹

// Send test SMS message
const message = await SupportMessage.create({
sender_type: 'conversation',
sender_id: testConversationId,
source: 'sms',
content: 'Test message',
to: '+1234567890',
// delivered field not set - will be picked up by job
});

// Create communication log (simulates Twilio callback)
await Communication.create({
conv_message_id: message._id,
account_id: testAccountId,
success: true,
provider: 'twilio',
response: {
/* provider response */
},
});

// Wait 5 seconds for job to process
setTimeout(async () => {
const updated = await SupportMessage.findById(message._id);
console.log('Delivery status updated:', updated.delivered); // true
console.log('Communication ID stored:', updated.metadata.communication_id);
}, 6000);

Monitor Status Queueโ€‹

// Count messages pending status check
const pending = await SupportMessage.countDocuments({
sender_type: 'conversation',
source: { $in: ['sms', 'email'] },
delivered: { $exists: false },
status_queue_in_progress: false,
});

console.log('Messages pending status check:', pending);

// Count messages currently processing
const inProgress = await SupportMessage.countDocuments({
status_queue_in_progress: true,
});

console.log('Messages currently processing:', inProgress);

Test Socket Emissionโ€‹

// Monitor socket events in Conversation Socket logs
// Look for 'updateMessage' events with delivered status

// Example event payload:
{
event: 'updateMessage',
data: {
_id: '...',
delivered: true,
metadata: { communication_id: '...' },
// ... other message fields
}
}

Job Type: Scheduled
Execution Frequency: Every 5 seconds
Average Duration: 500-1000ms per message
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM