Data Flows
This document illustrates the complete data flows for key features in General Socket service.
Conversation Message Flow
Flow: Internal API → General Socket → Connected Clients
┌─────────────────┐
│ Internal API │ User sends message via REST API
└────────┬────────┘
│ POST /v1/conversations/:id/messages
↓
┌─────────────────┐
│ MongoDB │ 1. Create Message document
└────────┬────────┘ 2. Update Conversation.last_activity
│ 3. Increment unread_count for other users
↓
┌─────────────────┐
│ Internal API │ Axios POST request
└────────┬────────┘
│ POST /emit/{userId}/new_message
↓
┌─────────────────┐
│ General Socket │ 1. Receive REST request
│ (Port 4000) │ 2. Query Message + populate users
└────────┬────────┘ 3. Find all socket IDs for users
│
├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ User 1 Socket │ │ User 2 Socket │
│ (Browser) │ │ (Browser) │
└─────────────────┘ └─────────────────┘
emit('new_message', data) emit('new_message', data)
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - Show message │ │ - Show message │
│ - Play sound │ │ - Show badge │
└─────────────────┘ └─────────────────┘
Steps:
- User Action: User clicks Send in conversation UI
- API Call: Frontend calls
POST /v1/conversations/{id}/messages - Database: Internal API creates Message, updates Conversation
- Socket Trigger: Internal API calls General Socket REST endpoint
- Socket Query: General Socket fetches message + users from DB
- Socket Emission: For each user, emit to all their connected sockets
- Client Update: Each client receives event and updates UI
Prospect Communication Flow
Flow: External Webhook → General Socket → Dashboard Users
Incoming SMS (Twilio)
┌─────────────────┐
│ Customer Phone │ Sends SMS
└────────┬────────┘
│
↓
┌─────────────────┐
│ Twilio API │ Receives SMS
└────────┬────────┘
│ POST /webhook/twilio/sms
↓
┌─────────────────┐
│ External API │ 1. Parse SMS payload
│ (Port 5003) │ 2. Find Contact by phone
└────────┬────────┘ 3. Create Communication (type: INCOMING)
│ 4. Associate with ConversationProspect
↓
┌─────────────────┐
│ MongoDB │ Insert Communication document
└────────┬────────┘
│
↓
┌─────────────────┐
│ External API │ Axios GET request
└────────┬────────┘
│ GET /conversations/prospect/emit/message/{messageID}
↓
┌─────────────────┐
│ General Socket │ 1. Fetch Communication + populate sent_by
│ (Port 4000) │ 2. For each conversation_id, get users
└────────┬────────┘ 3. Query all socket IDs
│
├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Agent 1 Socket │ │ Agent 2 Socket │
│ (Dashboard) │ │ (Dashboard) │
└─────────────────┘ └─────────────────┘
emit('prospect_message', { emit('prospect_message', {
type: 'INCOMING', type: 'INCOMING',
data: 'SMS text', data: 'SMS text',
sent_by: Contact, sent_by: Contact,
is_sender: false is_sender: false
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - New message │ │ - New message │
│ - Notification │ │ - Notification │
│ - Update badge │ │ - Update badge │
└─────────────────┘ └─────────────────┘
Outgoing SMS (Agent Reply)
┌─────────────────┐
│ Agent Dashboard │ Clicks Send Reply
└────────┬────────┘
│ POST /v1/prospects/conversations/{id}/reply
↓
┌─────────────────┐
│ Internal API │ 1. Create Communication (type: OUTGOING)
│ (Port 5002) │ 2. Get Contact phone + Twilio config
└────────┬────────┘ 3. Call Twilio API to send SMS
│ 4. Associate with ConversationProspect
↓
┌─────────────────┐
│ Twilio API │ Send SMS to customer
└────────┬────────┘
│ (Async - delivery status webhook later)
↓
┌─────────────────┐
│ MongoDB │ Insert Communication document
└────────┬────────┘
│
↓
┌─────────────────┐
│ Internal API │ Axios GET request
└────────┬────────┘
│ GET /conversations/prospect/emit/message/{messageID}
↓
┌─────────────────┐
│ General Socket │ 1. Fetch Communication + populate sent_by (User)
│ (Port 4000) │ 2. Get all users in conversation
└────────┬────────┘ 3. Query socket IDs
│
├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Agent 1 Socket │ │ Agent 2 Socket │
│ (Sender) │ │ (Other User) │
└─────────────────┘ └─────────────────┘
emit('prospect_message', { emit('prospect_message', {
type: 'OUTGOING', type: 'OUTGOING',
data: 'Reply text', data: 'Reply text',
sent_by: User, sent_by: User,
is_sender: true is_sender: false
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - Show sent msg│ │ - Show message │
│ - Clear input │ │ - Update time │
└─────────────────┘ └─────────────────┘
Lead Scraper Task Flow
Flow: Internal API → General Socket → Python Scraper → General Socket → Internal API
┌─────────────────┐
│ Dashboard │ User creates Lead Finder campaign
└────────┬────────┘
│ POST /v1/lead-finder/campaigns
↓
┌─────────────────┐
│ Internal API │ 1. Create Campaign document
│ (Port 5002) │ 2. Generate scrape tasks
└────────┬────────┘ 3. Get available scraper socket IDs
│
↓
┌─────────────────┐
│ General Socket │ Query /leadfinder namespace
│ Scraper Pool │ - Return socket IDs of connected scrapers
└────────┬────────┘ - Load balance across scrapers
│
↓ (Has 3 scrapers connected)
┌─────────────────────────────────────────────┐
│ Load Balancing Strategy │
│ - Round-robin across socket IDs │
│ - Track active tasks per scraper │
│ - Distribute evenly │
└────────┬────────────────────────────────────┘
│
├─────────────────┬─────────────────┐
↓ ↓ ↓
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Scraper 1 │ │ Scraper 2 │ │ Scraper 3 │
│ (Python) │ │ (Python) │ │ (Python) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ POST /lead-finder/scrape_task │
↓ ↓ ↓
emit('scrape_task', { (receives (receives
task_id: '001', task 002) task 003)
keyword: 'plumber',
location: 'NYC'
})
│
↓
┌─────────────────┐
│ Python Scraper │ 1. Parse task payload
│ Process │ 2. Execute Google Maps scraping
└────────┬────────┘ 3. Extract business data
│ 4. Return results
↓
┌─────────────────┐
│ Scraper Results │ Axios POST request
└────────┬────────┘
│ POST /v1/lead-finder/tasks/{task_id}/results
↓
┌─────────────────┐
│ Internal API │ 1. Validate results
│ (Port 5002) │ 2. Store leads in MongoDB
└────────┬────────┘ 3. Update task status
│ 4. Notify user via socket
↓
┌─────────────────┐
│ General Socket │ Axios POST request
└────────┬────────┘
│ POST /emit/{userId}/scrape_complete
↓
┌─────────────────┐
│ Dashboard User │ emit('scrape_complete', {
│ (Browser) │ task_id: '001',
└─────────────────┘ leads_found: 42
│
↓
┌─────────────────┐
│ UI Update │ - Show notification
│ │ - Refresh leads table
└─────────────────┘ - Update campaign stats
Key Points:
- No Authentication: Scraper namespace requires no JWT
- Load Balancing: Tasks distributed across available scrapers
- Bi-directional: Internal API → Scraper and Scraper → Internal API
- Scalable: Add more scrapers by simply connecting more sockets
Typing Indicator Flow
Real-time User Presence Indication
┌─────────────────┐
│ User 1 Browser │ User starts typing
└────────┬────────┘
│ keydown event on message input
↓
┌─────────────────┐
│ Frontend Logic │ Emit after 500ms debounce
└────────┬────────┘
│ socket.emit('convo_someone_typing', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ userId: '64user1...'
│ })
↓
┌─────────────────┐
│ General Socket │ 1. Receive event on /v1/conversation namespace
│ (Port 4000) │ 2. Validate user is in conversation
└────────┬────────┘ 3. Get other users' socket IDs
│ 4. Emit to other users only
│
├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ User 2 Socket │ │ User 3 Socket │
└─────────────────┘ └─────────────────┘
emit('convo_someone_typing', { emit('convo_someone_typing', {
userId: '64user1...', userId: '64user1...',
userName: 'John Doe', userName: 'John Doe',
conversationId: '64conv...' conversationId: '64conv...'
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ "John is │ │ "John is │
│ typing..." │ │ typing..." │
└─────────────────┘ └─────────────────┘
(After 3 seconds of no typing)
┌─────────────────┐
│ User 1 Browser │ User stops typing
└────────┬────────┘
│ socket.emit('convo_stopped_typing', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...'
│ })
↓
┌─────────────────┐
│ General Socket │ Emit to other users
└────────┬────────┘
│
↓
┌─────────────────┐ ┌─────────────────┐
│ User 2 Socket │ │ User 3 Socket │
└─────────────────┘ └─────────────────┘
emit('convo_stopped_typing') emit('convo_stopped_typing')
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ Remove │ │ Remove │
│ "typing..." │ │ "typing..." │
└─────────────────┘ └─────────────────┘
Optimization:
- Frontend debounces typing events (500ms)
- Auto-clear typing indicator after 3 seconds
- Only emits to users in same conversation
- Excludes sender from receiving own typing events
Read Receipt Flow
Track Message Read Status
┌─────────────────┐
│ User 2 Browser │ User opens conversation
└────────┬────────┘
│ Component mount / visibility change
↓
┌─────────────────┐
│ Frontend Logic │ Mark unread messages as read
└────────┬────────┘
│ socket.emit('convo_mark_conversation_read', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ messageIds: ['64msg1...', '64msg2...']
│ })
↓
┌─────────────────┐
│ General Socket │ 1. Update Message.read_by array
│ (Port 4000) │ 2. Decrement Conversation.unread_count for user
└────────┬────────┘ 3. Get sender's socket IDs
│ 4. Emit read receipt to sender
↓
┌─────────────────┐
│ MongoDB │ Message.updateMany({
└─────────────────┘ _id: { $in: messageIds },
$addToSet: { read_by: userId }
})
┌─────────────────┐
│ User 1 Socket │ (Original sender)
└─────────────────┘
emit('convo_message_read', {
messageIds: ['64msg1...', '64msg2...'],
readBy: {
_id: '64user2...',
name: 'Jane Doe'
},
timestamp: '2025-10-13T10:35:00.000Z'
})
│
↓
┌─────────────────┐
│ UI Update │ - Show checkmarks
│ │ - Update message status
└─────────────────┘ - "Read by Jane Doe"
Snooze Conversation Flow
Temporarily Hide Conversation
┌─────────────────┐
│ User Dashboard │ Clicks Snooze (1 hour)
└────────┬────────┘
│ socket.emit('convo_snooze_conversation', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ snoozeUntil: 1697200000000 // Unix timestamp
│ })
↓
┌─────────────────┐
│ General Socket │ 1. Find ConversationSnooze for user
│ (Port 4000) │ 2. Update snoozeUntil + is_snoozed flag
└────────┬────────┘ 3. Emit confirmation back to user
│
↓
┌─────────────────┐
│ MongoDB │ ConversationSnooze.updateOne({
└─────────────────┘ user: userId,
conversation: conversationId
}, {
snoozeUntil: Date,
is_snoozed: true
})
┌─────────────────┐
│ User Socket │ emit('convo_snooze_conversation', {
└─────────────────┘ conversationId: '64conv...',
│ snoozeUntil: 1697200000000,
│ is_snoozed: true
↓ })
┌─────────────────┐
│ UI Update │ - Hide conversation from list
│ │ - Show in "Snoozed" tab
└─────────────────┘ - Set timer to un-snooze
(After snooze period expires)
┌─────────────────┐
│ Cron Job │ Queue Manager checks snoozeUntil
│ (Queue Mgr) │ For expired snoozes, call REST API
└────────┬────────┘
│ POST /emit/{userId}/conversation_unsnoozed
↓
┌─────────────────┐
│ General Socket │ Update ConversationSnooze.is_snoozed = false
└────────┬────────┘ Emit to user's sockets
│
↓
┌─────────────────┐
│ User Socket │ emit('conversation_unsnoozed', {
└─────────────────┘ conversationId: '64conv...'
│ })
↓
┌─────────────────┐
│ UI Update │ - Show conversation in main list
│ │ - Remove from "Snoozed" tab
└─────────────────┘ - Show notification if new messages
Multi-Device Synchronization
User Logged in on Desktop + Mobile
┌─────────────────┐
│ Desktop Client │ User sends message
└────────┬────────┘
│ POST /v1/conversations/{id}/messages
↓
┌─────────────────┐
│ Internal API │ Create message → Trigger socket emission
└────────┬────────┘
│
↓
┌─────────────────┐
│ General Socket │ Query all socket IDs for User 1
│ User 1 has 2 │ - Desktop: socketId_desktop_abc
│ active sockets │ - Mobile: socketId_mobile_xyz
└────────┬────────┘
│
├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Desktop Socket │ │ Mobile Socket │
│ (Same User) │ │ (Same User) │
└─────────────────┘ └─────────────────┘
emit('new_message', data) emit('new_message', data)
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Desktop UI │ │ Mobile UI │
│ - Show message │ │ - Show message │
│ - Clear input │ │ - Push notif │
└─────────────────┘ └─────────────────┘
Key: Both devices receive the same event for real-time sync across devices.
Error Recovery Flow
Socket Disconnection Handling
┌─────────────────┐
│ User Browser │ Network interruption
└────────┬────────┘
│ (Connection lost)
↓
┌─────────────────┐
│ Socket.IO Client│ Auto-reconnect attempt (exponential backoff)
└────────┬────────┘ - Retry after 1s, 2s, 4s, 8s...
│
↓ (Connection restored)
┌─────────────────┐
│ General Socket │ 'connection' event fired
└────────┬────────┘
│ 1. Authenticate with JWT
↓ 2. Get user's active conversations
┌─────────────────┐
│ is-auth.js │ Verify token + extract userId
└────────┬────────┘
│
↓
┌─────────────────┐
│ Frontend Logic │ Re-join all active rooms
└────────┬────────┘
│ socket.emit('convo_join', {
│ conversationId: '64conv1...',
│ roomId: 'room_64conv1...'
│ })
│ socket.emit('convo_join', {
│ conversationId: '64conv2...',
│ roomId: 'room_64conv2...'
│ })
↓
┌─────────────────┐
│ General Socket │ Join socket to rooms
└────────┬────────┘ - room_64conv1...
│ - room_64conv2...
↓
┌─────────────────┐
│ Frontend API │ Fetch missed messages
└────────┬────────┘ GET /v1/conversations/{id}/messages?since={lastTimestamp}
│
↓
┌─────────────────┐
│ UI Update │ - Merge missed messages
│ │ - Show "Connection restored"
└─────────────────┘ - Resume normal operation
Socket.IO Built-in Features:
- Automatic reconnection with exponential backoff
- Buffering of emitted events during disconnection
connect,disconnect,reconnectevents for handling
System-Wide Broadcast Flow
Example: Scheduled Maintenance Announcement
┌─────────────────┐
│ Admin Panel │ Admin schedules maintenance
└────────┬────────┘
│ POST /v1/admin/announcements
↓
┌─────────────────┐
│ Internal API │ Create Announcement document
└────────┬────────┘
│ Axios POST /emit/system_announcement
↓
┌─────────────────┐
│ General Socket │ Query all active sessions
│ (Port 4000) │ - Get all connected socket IDs
└────────┬────────┘ - Broadcast to everyone
│
├─────────┬─────────┬─────────┬─────────┐
↓ ↓ ↓ ↓ ↓
[User 1] [User 2] [User 3] [User 4] [User 5]
│ │ │ │ │
└─────────┴─────────┴─────────┴─────────┘
│
↓
emit('system_announcement', {
title: 'Scheduled Maintenance',
message: 'System down Monday 2-4AM',
priority: 'high',
action_url: '/maintenance-info'
})
│
↓
┌─────────────────────────────────────────────────┐
│ All Connected Clients │
│ - Show modal/banner │
│ - Play notification sound │
│ - Store in localStorage for offline viewing │
└─────────────────────────────────────────────────┘
Data Flow Summary
| Flow Type | Initiator | Trigger Method | Socket Event | Target Users |
|---|---|---|---|---|
| Conversation Message | Internal API | REST | new_message | Conversation |
| Prospect Incoming SMS | External API | Webhook | prospect_message | Conversation |
| Prospect Outgoing Reply | Internal API | User Action | prospect_message | Conversation |
| Lead Scraper Task | Internal API | Campaign | scrape_task | Specific Socket |
| Typing Indicator | Client Browser | User Typing | convo_someone_typing | Others in Convo |
| Read Receipt | Client Browser | View Message | convo_message_read | Message Sender |
| Snooze Conversation | Client Browser | User Action | convo_snooze | Same User |
| System Announcement | Internal API | Admin Action | system_announcement | All Users |
| Multi-Device Sync | Any Service | REST/Socket | Various | Same User |
Best Practices
For Service Integration
- Always use REST API for triggering socket emissions from other services
- Never query sockets directly from other services (use General Socket REST endpoints)
- Validate data before emitting to prevent invalid socket events
- Handle errors gracefully with fallback mechanisms
For Frontend Integration
- Reconnect logic: Implement reconnection with exponential backoff
- Room management: Join rooms immediately after connection
- Message buffering: Queue messages during disconnection
- Optimistic UI: Update UI immediately, rollback on error
- Event debouncing: Debounce typing indicators and scroll events
For Scaling
- Use Redis adapter for multi-instance horizontal scaling
- Implement load balancing for scraper task distribution
- Monitor socket connections to detect bottlenecks
- Cache user-socket mappings to reduce DB queries
- Set socket timeouts to clean up stale connections