Twilio Socket Provider
🎯 Overview
The Socket Provider (Providers/socket.js) handles real-time event emission for Twilio messages and conversations, enabling live updates to the DashClicks conversation system via WebSocket connections.
Source: external/Integrations/Twilio/Providers/socket.js
Key Capabilities:
- Emit message events to conversation socket service
- Emit conversation events (new/updated conversations)
- Graceful error handling (always resolves)
- HTTP-based event emission (not direct WebSocket)
Architecture:
This provider communicates with the DashClicks Conversation Socket Service (port 6001), which then broadcasts events to connected WebSocket clients (web app, mobile app).
graph LR
A[Twilio Webhook] --> B[Twilio Integration]
B --> C[Socket Provider]
C --> D[Conversation Socket Service :6001]
D --> E[WebSocket Clients]
E --> F[Web App]
E --> G[Mobile App]
style C fill:#e3f2fd
style D fill:#fff4e6
🔌 Provider Functions
emitMessage()
Emit a message event to trigger real-time updates.
Signature:
exports.emitMessage = async(messageID);
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
messageID | String | ✅ | MongoDB message ID (conversation-message) |
Returns:
Promise<true>; // Always resolves to true (graceful error handling)
Business Logic:
-
Convert Message ID to String
- Ensures messageID is string format
-
Construct Emit URL
- Uses
CONVERSATION_EMIT_URLenvironment variable - Endpoint:
/emit/message/{messageID}
- Uses
-
Send HTTP GET Request
- Uses axios to trigger conversation socket service
- Conversation socket service broadcasts to WebSocket clients
-
Graceful Error Handling
- On error: logs but still resolves
true - Prevents Twilio webhook failures from breaking message flow
- On error: logs but still resolves
Example Usage:
const socketProvider = require('./Providers/socket');
// After receiving SMS from Twilio webhook
router.post('/webhooks/sms', async (req, res) => {
const { MessageSid, From, To, Body } = req.body;
// 1. Save message to database
const message = await ConversationMessage.create({
messageSid: MessageSid,
from: From,
to: To,
body: Body,
direction: 'inbound',
createdAt: new Date(),
});
// 2. Emit real-time event
await socketProvider.emitMessage(message._id.toString());
// 3. Respond to Twilio
res.sendStatus(200);
});
Data Flow:
sequenceDiagram
participant Twilio as Twilio
participant Webhook as Webhook Handler
participant DB as MongoDB
participant SocketProvider as Socket Provider
participant ConvoSocket as Conversation Socket
participant WebApp as Web App Client
Twilio->>Webhook: POST /webhooks/sms
Webhook->>DB: Save message
DB-->>Webhook: Message ID
Webhook->>SocketProvider: emitMessage(messageID)
SocketProvider->>ConvoSocket: GET /emit/message/{id}
ConvoSocket->>WebApp: WebSocket: new_message event
WebApp->>WebApp: Update UI
SocketProvider-->>Webhook: true
Webhook-->>Twilio: 200 OK
style SocketProvider fill:#e3f2fd
style ConvoSocket fill:#fff4e6
emitConversation()
Emit a conversation event to trigger real-time updates.
Signature:
exports.emitConversation = async(convoID, (isNew = 'old'));
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
convoID | String | ✅ | - | MongoDB conversation ID |
isNew | String | ❌ | 'old' | Conversation status: 'new' or 'old' |
Returns:
Promise<true>; // Always resolves to true
Business Logic:
-
Convert Conversation ID to String
-
Construct Emit URL
- Endpoint:
/emit/conversation/{convoID}/{isNew} isNewparameter indicates if this is a new conversation
- Endpoint:
-
Send HTTP GET Request
- Triggers conversation socket service
- Socket service broadcasts conversation update
-
Graceful Error Handling
- Always resolves
trueeven on error
- Always resolves
Example Usage:
// After creating new conversation
async function createConversation(phoneNumber, accountId) {
// 1. Create conversation in database
const conversation = await Conversation.create({
phoneNumber,
accountId,
status: 'active',
createdAt: new Date(),
});
// 2. Emit real-time event (new conversation)
await socketProvider.emitConversation(conversation._id.toString(), 'new');
return conversation;
}
Update Existing Conversation:
// After updating conversation metadata
async function updateConversationStatus(conversationId, status) {
// 1. Update database
await Conversation.updateOne({ _id: conversationId }, { $set: { status } });
// 2. Emit real-time event (old/existing conversation)
await socketProvider.emitConversation(conversationId.toString(), 'old');
}
Data Flow:
sequenceDiagram
participant API as DashClicks API
participant SocketProvider as Socket Provider
participant ConvoSocket as Conversation Socket
participant WebApp as Web App Client
API->>SocketProvider: emitConversation(convoID, 'new')
SocketProvider->>ConvoSocket: GET /emit/conversation/{id}/new
ConvoSocket->>WebApp: WebSocket: new_conversation event
Note over WebApp: Show notification<br/>Add to conversation list
SocketProvider-->>API: true
style SocketProvider fill:#e3f2fd
style ConvoSocket fill:#fff4e6
💡 Integration Examples
Complete SMS Webhook Handler
router.post('/webhooks/sms/:accountId', async (req, res) => {
const { accountId } = req.params;
const { MessageSid, From, To, Body, NumMedia } = req.body;
try {
// 1. Find or create conversation
let conversation = await Conversation.findOne({
accountId,
phoneNumber: From,
});
if (!conversation) {
conversation = await Conversation.create({
accountId,
phoneNumber: From,
status: 'active',
lastMessageAt: new Date(),
});
// Emit new conversation event
await socketProvider.emitConversation(conversation._id.toString(), 'new');
} else {
// Update existing conversation
await Conversation.updateOne(
{ _id: conversation._id },
{ $set: { lastMessageAt: new Date() } },
);
// Emit conversation update
await socketProvider.emitConversation(conversation._id.toString(), 'old');
}
// 2. Save message
const message = await ConversationMessage.create({
conversationId: conversation._id,
messageSid: MessageSid,
from: From,
to: To,
body: Body,
mediaCount: parseInt(NumMedia || 0),
direction: 'inbound',
status: 'received',
createdAt: new Date(),
});
// 3. Emit message event
await socketProvider.emitMessage(message._id.toString());
// 4. Send auto-reply (optional)
if (conversation.autoReply) {
await sendAutoReply(accountId, From, conversation.autoReplyText);
}
res.sendStatus(200);
} catch (error) {
console.error('SMS webhook error:', error);
res.sendStatus(500);
}
});
Outbound SMS with Real-Time Updates
async function sendSMS(conversationId, to, body) {
const conversation = await Conversation.findById(conversationId);
const account = await Account.findById(conversation.accountId);
// 1. Send SMS via Twilio
const smsProvider = require('./sms-api');
const result = await smsProvider.sendSingleSms(
account.twilio_account.sid,
account.twilio_account.authToken,
body,
account.twilio_account.primaryNumber,
to,
);
// 2. Save message to database
const message = await ConversationMessage.create({
conversationId,
messageSid: result.data.sid,
from: account.twilio_account.primaryNumber,
to,
body,
direction: 'outbound',
status: result.data.status, // queued, sent
createdAt: new Date(),
});
// 3. Update conversation timestamp
await Conversation.updateOne({ _id: conversationId }, { $set: { lastMessageAt: new Date() } });
// 4. Emit real-time events
await socketProvider.emitMessage(message._id.toString());
await socketProvider.emitConversation(conversationId.toString(), 'old');
return message;
}
Conversation Archive Handler
async function archiveConversation(conversationId) {
// 1. Update conversation status
await Conversation.updateOne(
{ _id: conversationId },
{ $set: { status: 'archived', archivedAt: new Date() } },
);
// 2. Emit real-time update
await socketProvider.emitConversation(conversationId.toString(), 'old');
// Clients will update UI to remove from active list
return { success: true };
}
Bulk Message Read Notifications
async function markConversationAsRead(conversationId, userId) {
// 1. Update all unread messages
await ConversationMessage.updateMany(
{
conversationId,
direction: 'inbound',
read: false,
},
{
$set: {
read: true,
readBy: userId,
readAt: new Date(),
},
},
);
// 2. Update conversation
await Conversation.updateOne({ _id: conversationId }, { $set: { unreadCount: 0 } });
// 3. Emit conversation update
await socketProvider.emitConversation(conversationId.toString(), 'old');
// All connected clients will update unread badge
}
Real-Time Typing Indicators
// Note: This provider doesn't handle typing indicators directly,
// but you can extend it for additional real-time events
async function sendTypingIndicator(conversationId, userId, isTyping) {
// Emit to conversation socket service directly
const axios = require('axios');
try {
await axios.get(
`${process.env.CONVERSATION_EMIT_URL}/emit/typing/${conversationId}/${userId}/${isTyping}`,
);
} catch (error) {
// Graceful error handling
console.error('Failed to emit typing indicator:', error.message);
}
}
🚨 Error Handling
Graceful Degradation Pattern:
This provider uses a fail-safe pattern - errors are caught and logged, but the function always resolves true. This ensures:
- Twilio webhooks don't fail due to socket errors
- Message/conversation data is saved even if real-time updates fail
- Users can refresh to see updates if WebSocket fails
Error Scenarios:
1. Conversation Socket Service Unavailable
// Socket provider will log error but return true
await socketProvider.emitMessage(messageId); // Returns true even if service down
- Impact: Real-time updates don't work, but data is saved
- Fallback: Users can refresh page to see updates
- Monitoring: Check conversation socket service health
2. Network Timeout
// Axios timeout (default unlimited)
- Impact: emitMessage/emitConversation may hang
- Solution: Consider adding axios timeout configuration
3. Invalid Message/Conversation ID
// 404 from conversation socket service
- Impact: Event emission fails silently
- Prevention: Validate IDs before emitting
⚡ Performance Considerations
- Async Non-Blocking: Uses async HTTP calls, doesn't block webhook handlers
- No Retry Logic: Failed emissions are not retried
- HTTP Overhead: Each emit is a separate HTTP request (consider batching for high volume)
- Graceful Failures: Always returns success to prevent webhook retry storms
🔧 Configuration
Required Environment Variable:
CONVERSATION_EMIT_URL=http://localhost:6001
- Development:
http://localhost:6001 - Production: URL of conversation socket service (internal network)
🔗 Related Documentation
- Twilio Integration Overview - Main Twilio integration
- SMS Provider - SMS sending (triggers emitMessage)
- Conversation Socket Service - Real-time WebSocket service (port 6001)
Internal Services:
- Conversation Socket (
conversation-socket/) - WebSocket broadcasting service - Conversations Collection (
conversation) - Conversation data model - Messages Collection (
conversation-message) - Message data model
💡 Best Practices
- Always Emit After DB Save: Emit events only after successful database operations
- Use Correct isNew Flag: Mark new conversations as
'new'for proper UI updates - Don't Block on Emit: Socket emissions are fire-and-forget
- Monitor Socket Service: Ensure conversation socket service is healthy
- Handle Webhook Failures Gracefully: Socket failures shouldn't break webhooks
🎯 Real-World Use Cases
Live Chat Dashboard
// Users see messages appear in real-time as they arrive
router.post('/webhooks/sms', async (req, res) => {
// ... save message to DB ...
// Emit to all agents viewing this conversation
await socketProvider.emitMessage(message._id);
res.sendStatus(200);
});
Conversation Notifications
// New conversation triggers notification badge
const conversation = await Conversation.create({
/* ... */
});
// isNew='new' triggers notification in UI
await socketProvider.emitConversation(conversation._id, 'new');
Multi-Agent Collaboration
// When agent sends reply, other agents see it in real-time
async function sendReply(agentId, conversationId, message) {
const msg = await ConversationMessage.create({
/* ... */
});
// All agents viewing conversation see the reply
await socketProvider.emitMessage(msg._id);
}