Skip to main content

๐Ÿฅ Support Communication Check Processing

๐Ÿ“– Overviewโ€‹

The Support Communication Check job acts as a health monitoring and recovery system for support conversations. It runs every 15 seconds to detect incoming communications (SMS/email) that have a room assignment but no corresponding message record, then creates the missing messages and notifies agents. This ensures no customer messages are lost due to system errors or race conditions.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/conversations/support.communication-check.js
  2. Service Processing: queue-manager/services/conversations/support.communication-check.js
  3. Queue Definition: queue-manager/queues/conversations/support.communication-check.js

Execution Pattern: Cron-based (every 15 seconds)

Queue Name: support_communication_check_queue

Environment Flag: QM_CONVERSATIONS=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 15s)
participant SERVICE as Comm Check Service
participant COMM as Communication<br/>Collection
participant QUEUE as Bull Queue
participant CONV as SupportConversation<br/>Collection
participant MSG as SupportMessage<br/>Collection
participant SOCKET as Conversation Socket
participant AGENT as Support Agent

CRON->>SERVICE: Check for orphaned communications
SERVICE->>COMM: Query communications<br/>(has conv_room_id,<br/>no conv_message_id,<br/>INCOMING type,<br/>last 15 mins)
COMM-->>SERVICE: Return orphaned comms

SERVICE->>COMM: Mark conversation_communication_in_progress=true

loop For each orphaned communication
SERVICE->>QUEUE: Add recovery job
QUEUE->>COMM: Fetch communication details

alt Has conv_id
QUEUE->>QUEUE: Use existing conv_id
else No conv_id
QUEUE->>CONV: Find by contact_id
CONV-->>QUEUE: Return conversation
end

QUEUE->>MSG: Create SupportMessage<br/>(content, source, metadata)
MSG-->>QUEUE: Message created

QUEUE->>SOCKET: Emit 'newMessage' event
SOCKET-->>AGENT: Notify agent of recovered message

QUEUE->>COMM: Update conversation_communication_in_progress=false
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/conversations/support.communication-check.js

Purpose: Schedule communication health checks every 15 seconds

Cron Pattern: */15 * * * * * (every 15 seconds)

Initialization:

const supportCommunicationCheck = require('../../services/conversations/support.communication-check');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/15 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await supportCommunicationCheck();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/conversations/comm-check', error: err });
}
};

In-Progress Lock: Prevents overlapping executions.

2. Service Processing (THE CORE LOGIC)โ€‹

File: queue-manager/services/conversations/support.communication-check.js

Purpose: Query orphaned communications and add to recovery queue

Orphaned Communication Criteria:

  • conv_room_id: { $exists: true } - Has room assignment
  • conv_message_id: { $exists: false } - Missing message record
  • type: 'INCOMING' - Customer message (not outgoing)
  • conversation_communication_in_progress: { $ne: true } - Not already processing
  • createdAt: { $gte: last_15_mins } - Created in last 15 minutes

Processing Logic:

const Communication = require('../../models/communication');
const Queue = require('../../queues/conversations/support.communication-check');

module.exports = async () => {
try {
// 1. Define 15-minute lookback window
const fif_mins = 1000 * 60 * 15;
const last_15_mins = new Date(new Date() - new Date(fif_mins));

// 2. Query orphaned communications
const communications = await Communication.aggregate([
{
$match: {
conv_room_id: {
$exists: true, // Has room assignment
},
conv_message_id: {
$exists: false, // Missing message record (ORPHANED)
},
type: 'INCOMING', // Customer message
conversation_communication_in_progress: {
$ne: true, // Not already processing
},
createdAt: {
$gte: last_15_mins, // Last 15 minutes only
},
},
},
{
$sort: {
created: -1, // Most recent first
},
},
]);

if (communications.length) {
// 3. Collect communication IDs
const ids = communications.map(message => message._id);

// 4. Mark as processing to prevent duplicate recovery
await Communication.updateMany(
{ _id: { $in: ids } },
{ conversation_communication_in_progress: true },
);

// 5. Add each orphaned communication to recovery queue
await Promise.all(
communications.map(async communication => {
try {
const queue = await Queue.start();
await queue.add(
{ id: communication._id },
{
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
},
);
} catch (err) {
// Reset processing flag on error
await Communication.updateOne(
{ _id: communication._id },
{ conversation_communication_in_progress: false },
);
console.log(
`Error occured while processing support communication check`,
err.message,
err.stack,
);
}
}),
);

console.log('Processed support communication check.');
}
} catch (err) {
console.log(
`Error occured while processing support communication check`,
err.message,
err.stack,
);
}
};

15-Minute Window: Only checks recent communications to avoid reprocessing old orphaned records.

3. Queue Definitionโ€‹

File: queue-manager/queues/conversations/support.communication-check.js

Purpose: Create missing SupportMessage from Communication log and notify agents

Queue Options:

{
attempts: 10, // 10 retry attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup
}

Job Processor:

queue.process(async (job, done) => {
try {
const { id } = job.data;

// 1. Fetch communication log
const communication = await Communication.findOne({ _id: id });

// 2. Determine conversation ID
let conversation_id = communication._doc?.conv_id;

if (!conversation_id) {
// Fallback: lookup by contact_id
const conv = await SupportConversation.findOne({
contact_id: communication._doc.contact_id,
});

if (!conv) {
throw new Error(
'Support Conversation does not exist for the contact id: ',
communication._doc.contact_id?.toString(),
);
}

conversation_id = conv._id;
}

// 3. Create missing message and emit socket event in transaction
await withTransaction(async session => {
// Create SupportMessage from Communication log
const message = await new SupportMessage({
room_id: communication._doc.conv_room_id,
content:
communication._doc.body?.content ??
communication._doc.text ??
communication._doc.html ??
communication._doc?.body,
subject: communication._doc.body?.subject ?? communication._doc.subject ?? '',
source: communication._doc.message_type?.toLowerCase(),
message_type: 'text',
sender_id: conversation_id,
sender_type: communication._doc.conv_type,
metadata: { communication_id: communication._id },
user_notified: true, // Already notified via communication
created: communication._doc.createdAt,
}).save({ session });

// 4. Generate JWT token for socket authentication
const jwt_token = jwt.sign(
{
conversation_id: message.sender_id.toString(),
account_id: communication.account_id.toString(),
type: message.sender_type === 'conversation' ? 'team' : 'support',
},
process.env.APP_SECRET,
{ expiresIn: '2m' },
);

// 5. Emit socket event to update UI with recovered message
await axios.post(
`${process.env.CONVERSATION_SOCKET}/emit`,
{
event: 'newMessage',
data: message,
},
{
headers: {
Authorization: `Bearer ${jwt_token}`,
'x-account-id': communication.account_id?.toString(),
},
},
);
});

return done();
} catch (err) {
done(err);
}
});

Event Handlers:

// Error handler
queue.on('error', err => {
logger.error({
initiator: 'QM/conversation/support-comm-check',
error: err,
message: `Error in support communication check queue`,
});
});

// Failed job handler
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-comm-check',
error: err,
message: `Support Communication Check Queue job with id ${job.id}`,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await Communication.updateOne(
{ _id: job.data.id },
{ conversation_communication_in_progress: false },
);
}
});

// Completed job handler
queue.on('completed', async job => {
try {
await Communication.updateOne(
{ _id: job.data.id },
{ conversation_communication_in_progress: false },
);
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-comm-check',
error,
message: `Error while finalizing support communication check queue`,
});
}
});

๐Ÿ—„๏ธ Collections Usedโ€‹

communicationsโ€‹

  • Operations: Read, Update
  • Model: shared/models/communication.js
  • Usage Context:
    • Query orphaned communications (has room, no message)
    • Extract message content and metadata
    • Manage conversation_communication_in_progress flag
    • Source of truth for provider delivery logs

Key Fields:

  • conv_room_id: Support room assignment
  • conv_message_id: Reference to SupportMessage (null = orphaned)
  • conv_id: Conversation ID (may be null)
  • contact_id: Customer contact ID (fallback lookup)
  • type: 'INCOMING' | 'OUTGOING'
  • message_type: 'SMS' | 'EMAIL'
  • body, text, html: Message content (various formats)
  • subject: Email subject (if applicable)
  • conv_type: 'conversation' | 'support'
  • account_id: Owner account
  • createdAt: Communication timestamp

support_messagesโ€‹

  • Operations: Create
  • Model: shared/models/support-message.js
  • Usage Context:
    • Create missing message from communication log
    • Link to communication via metadata
    • Display in support conversation UI

Key Fields:

  • room_id: Support room
  • content: Message text content
  • subject: Email subject
  • source: 'sms' | 'email'
  • message_type: 'text' (always for recovered messages)
  • sender_id: Conversation ID
  • sender_type: 'conversation' | 'support'
  • metadata.communication_id: Link to Communication log
  • user_notified: Set to true (already notified)
  • created: Original communication timestamp

support_conversationsโ€‹

  • Operations: Read
  • Model: shared/models/support-conversation.js
  • Usage Context:
    • Fallback lookup when conv_id missing
    • Find by contact_id

Key Fields:

  • contact_id: Customer contact reference
  • account_id: Owner account

๐Ÿ”ง Job Configurationโ€‹

Queue Optionsโ€‹

{
attempts: 10, // Maximum retry attempts
backoff: {
type: "exponential", // Exponential backoff strategy
delay: 1000 // 1 second base delay
},
removeOnComplete: true, // Auto-cleanup on success
}

Cron Scheduleโ€‹

'*/15 * * * * *'; // Every 15 seconds

Frequency Rationale: 15-second intervals provide timely recovery while minimizing database load. Longer interval acceptable since this is a backup/recovery mechanism, not primary message path.

Lookback Windowโ€‹

const fif_mins = 1000 * 60 * 15; // 15 minutes
const last_15_mins = new Date(new Date() - new Date(fif_mins));

Purpose: Only process recent orphaned communications to avoid reprocessing old unrecoverable records.

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

Why This Job Existsโ€‹

Problem: Race conditions or system errors can cause communications to be logged without corresponding SupportMessage records:

  1. Provider webhook arrives (creates Communication)
  2. System attempts to create SupportMessage
  3. Database connection fails / service crashes
  4. Communication log exists, but SupportMessage doesn't
  5. Message is "lost" from agent's view

Solution: This job periodically scans for orphaned communications and recovers them.

Service Layer Processingโ€‹

Service Function: module.exports (anonymous async function)

Purpose: Query orphaned communications and add to recovery queue

Processing Steps:

  1. Calculate Lookback Window

    const fif_mins = 1000 * 60 * 15; // 15 minutes in milliseconds
    const last_15_mins = new Date(new Date() - new Date(fif_mins));

    Purpose: Only check recent communications to avoid endless reprocessing

  2. Query Orphaned Communications

    const communications = await Communication.aggregate([
    {
    $match: {
    conv_room_id: { $exists: true },
    conv_message_id: { $exists: false }, // ORPHANED
    type: 'INCOMING',
    conversation_communication_in_progress: { $ne: true },
    createdAt: { $gte: last_15_mins },
    },
    },
    { $sort: { created: -1 } },
    ]);

    Matching Criteria:

    • Has room assignment (conv_room_id exists)
    • Missing message record (conv_message_id doesn't exist) โš ๏ธ ORPHANED
    • Incoming customer message (not outgoing agent message)
    • Not already being recovered
    • Created in last 15 minutes
  3. Mark as Processing

    const ids = communications.map(message => message._id);
    await Communication.updateMany(
    { _id: { $in: ids } },
    { conversation_communication_in_progress: true },
    );

    Purpose: Prevent duplicate recovery attempts

  4. Add Recovery Jobs

    await Promise.all(
    communications.map(async communication => {
    const queue = await Queue.start();
    await queue.add(
    { id: communication._id },
    {
    attempts: 10,
    backoff: { type: 'exponential', delay: 1000 },
    },
    );
    }),
    );
  5. Error Recovery

    catch (err) {
    // Reset flag to allow retry on next cron run
    await Communication.updateOne(
    { _id: communication._id },
    { conversation_communication_in_progress: false }
    );
    }

Queue Processingโ€‹

Queue Processor: Inside queues/conversations/support.communication-check.js

Purpose: Create missing SupportMessage from Communication log

Job Data Structure:

{
id: ObjectId; // Communication document ID
}

Processing Steps:

  1. Fetch Communication Log

    const communication = await Communication.findOne({ _id: id });
  2. Determine Conversation ID

    let conversation_id = communication._doc?.conv_id;

    if (!conversation_id) {
    // Fallback lookup by contact_id
    const conv = await SupportConversation.findOne({
    contact_id: communication._doc.contact_id,
    });

    if (!conv) {
    throw new Error('Support Conversation does not exist');
    }

    conversation_id = conv._id;
    }

    Two-Step Lookup: Try conv_id first, fallback to contact_id lookup

  3. Create SupportMessage

    const message = await new SupportMessage({
    room_id: communication._doc.conv_room_id,
    content:
    communication._doc.body?.content ??
    communication._doc.text ??
    communication._doc.html ??
    communication._doc?.body,
    subject: communication._doc.body?.subject ?? communication._doc.subject ?? '',
    source: communication._doc.message_type?.toLowerCase(),
    message_type: 'text',
    sender_id: conversation_id,
    sender_type: communication._doc.conv_type,
    metadata: { communication_id: communication._id },
    user_notified: true, // Already notified via communication
    created: communication._doc.createdAt, // Use original timestamp
    }).save({ session });

    Content Fallback Chain: Try multiple fields to extract message content

  4. Generate JWT Token

    const jwt_token = jwt.sign(
    {
    conversation_id: message.sender_id.toString(),
    account_id: communication.account_id.toString(),
    type: message.sender_type === 'conversation' ? 'team' : 'support',
    },
    process.env.APP_SECRET,
    { expiresIn: '2m' },
    );
  5. Emit Socket Event

    await axios.post(
    `${process.env.CONVERSATION_SOCKET}/emit`,
    {
    event: 'newMessage',
    data: message,
    },
    {
    headers: {
    Authorization: `Bearer ${jwt_token}`,
    'x-account-id': communication.account_id?.toString(),
    },
    },
    );

    UI Update: Recovered message appears in agent's dashboard

  6. Cleanup Processing Flag

    queue.on('completed', async job => {
    await Communication.updateOne(
    { _id: job.data.id },
    { conversation_communication_in_progress: false },
    );
    });

Error Handling in Flowโ€‹

Service Layer Errors:

try {
const queue = await Queue.start();
await queue.add({ id: communication._id }, { attempts: 10, backoff: {...} });
} catch (err) {
// Reset flag to allow retry
await Communication.updateOne(
{ _id: communication._id },
{ conversation_communication_in_progress: false }
);
console.log(`Error occured while processing support communication check`, err.message);
}

Queue Processor Errors:

// Bull automatically retries based on attempts config (10 attempts)
queue.on('failed', async (job, err) => {
logger.error({
initiator: 'QM/conversation/support-comm-check',
error: err,
message: `Support Communication Check Queue job with id ${job.id}`,
});

// Reset flag after max retries
if (job.attemptsMade >= job.opts.attempts) {
await Communication.updateOne(
{ _id: job.data.id },
{ conversation_communication_in_progress: false },
);
}
});

Retry Strategy: 10 attempts with exponential backoff starting at 1 second

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Conversation Not Foundโ€‹

const conv = await SupportConversation.findOne({
contact_id: communication._doc.contact_id,
});

if (!conv) {
throw new Error(
'Support Conversation does not exist for the contact id: ',
communication._doc.contact_id?.toString(),
);
// Triggers retry - conversation may be created later
}

Reason: Conversation may not yet be created when communication arrives

Content Extraction Failureโ€‹

const content =
communication._doc.body?.content ??
communication._doc.text ??
communication._doc.html ??
communication._doc?.body;

if (!content) {
logger.warn({
initiator: 'QM/conversation/support-comm-check',
message: 'No content found in communication',
communication_id: communication._id,
});
// Still create message with empty content
}

Fallback Chain: Tries multiple content fields to handle different provider formats

Socket Emit Failureโ€‹

try {
await axios.post(`${process.env.CONVERSATION_SOCKET}/emit`, {...});
} catch (error) {
logger.error({
initiator: 'QM/conversation/support-comm-check',
error: 'Socket emit failed for recovered message',
details: error.message
});
throw error; // Trigger retry - agent must see recovered message
}

Critical: Recovered messages MUST appear in UI, so failures trigger retries

Retry Strategyโ€‹

{
attempts: 10, // Maximum 10 attempts
backoff: {
type: "exponential", // Exponential backoff
delay: 1000 // 1 second base delay
}
}

Backoff Schedule:

  • Attempt 1: Immediate
  • Attempt 2: 1 second
  • Attempt 3: 2 seconds
  • Attempt 4: 4 seconds
  • Attempt 5-10: Progressive exponential backoff (8s, 16s, 32s, 64s, 128s, 256s)

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

console.log('Processed support communication check.');
console.log(`Recovered ${communications.length} orphaned messages`);

Error Loggingโ€‹

logger.error({
initiator: 'QM/conversation/support-comm-check',
error: err,
message: `Error in support communication check queue`,
});

Performance Metricsโ€‹

  • Average Processing Time: ~500-1000ms per recovery
  • Success Rate: ~95% (5% may be permanently unrecoverable)
  • Typical Volume: 0-10 orphaned communications per day (healthy system)
  • Alert Threshold: >50 orphaned communications = system issue

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 15 seconds automatically
  • Manual Trigger: Via API endpoint (if QM_HOOKS=true)
  • Alert System: May be manually triggered after system recovery

Data Dependenciesโ€‹

  • Communication collection: Must have orphaned records
  • SupportConversation collection: For conversation lookup
  • Conversation Socket Service: For UI updates
  • Support Message Status: Tracks delivery status
  • Support Notification: Notifies agents of messages
  • Primary Message Creation: This job is backup when primary path fails

Downstream Effectsโ€‹

  • Agent Dashboard: Recovered messages appear in conversation history
  • Message Continuity: Fills gaps in conversation thread
  • Analytics: Corrects message count metrics

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Database Writes: Creates SupportMessage documents
  • โš ๏ธ Socket Events: Emits recovered messages to agents
  • โš ๏ธ Timestamp Preservation: Uses original communication timestamp
  • โš ๏ธ Notification Flag: Sets user_notified=true (assumes already notified)

Performance Considerationsโ€‹

  • 15-Second Intervals: Acceptable since this is recovery, not primary path
  • 15-Minute Lookback: Prevents reprocessing old unrecoverable records
  • Transaction Overhead: MongoDB transactions ensure consistency
  • Low Volume: Healthy systems have few orphaned communications

Maintenance Notesโ€‹

  • High Orphan Rate: >50 orphaned comms/day indicates system issue
  • Root Cause: Investigate why primary message creation failing
  • Monitoring: Track orphan rate as health metric
  • Cleanup: Old orphaned communications (>15min) ignored intentionally

Recovery Limitationsโ€‹

Cannot Recover:

  • Communications without conv_room_id (no room assignment)
  • Communications older than 15 minutes
  • Communications with permanently deleted conversations/contacts

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/conversations/support-communication-check

Create Test Orphaned Communicationโ€‹

// Simulate orphaned communication (has room, no message)
const communication = await Communication.create({
type: 'INCOMING',
message_type: 'SMS',
conv_room_id: testRoomId, // Has room
conv_message_id: null, // Missing message (ORPHANED)
conv_id: testConversationId,
contact_id: testContactId,
account_id: testAccountId,
body: 'Test orphaned message',
success: true,
createdAt: new Date(), // Recent
conversation_communication_in_progress: false,
});

// Wait 15 seconds for recovery
setTimeout(async () => {
const message = await SupportMessage.findOne({
'metadata.communication_id': communication._id,
});
console.log('Message recovered:', !!message);
}, 16000);

Monitor Orphaned Communicationsโ€‹

// Count orphaned communications
const orphaned = await Communication.countDocuments({
conv_room_id: { $exists: true },
conv_message_id: { $exists: false },
type: 'INCOMING',
conversation_communication_in_progress: false,
createdAt: { $gte: new Date(Date.now() - 15 * 60 * 1000) },
});

console.log('Orphaned communications pending recovery:', orphaned);

// Alert if high volume
if (orphaned > 50) {
console.error('โš ๏ธ HIGH ORPHAN RATE - INVESTIGATE SYSTEM ISSUE');
}

Test Recovery Processโ€‹

// Monitor recovery in logs
// Look for "Processed support communication check"

// Verify message creation
const communication = await Communication.findById(testCommunicationId);
const message = await SupportMessage.findOne({
'metadata.communication_id': communication._id,
});

console.log('Communication found:', !!communication);
console.log('Message created:', !!message);
console.log('Content matches:', message.content === communication.body);
console.log(
'Timestamp preserved:',
message.created.getTime() === communication.createdAt.getTime(),
);

Job Type: Scheduled (Health Check / Recovery)
Execution Frequency: Every 15 seconds
Average Duration: 500-1000ms per recovery
Status: Active
Health Metric: less than 10 orphaned communications per day is healthy

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM