Skip to main content

Data Flows

This document illustrates the complete data flows for key features in General Socket service.


Conversation Message Flow

Flow: Internal API → General Socket → Connected Clients

┌─────────────────┐
│ Internal API │ User sends message via REST API
└────────┬────────┘
│ POST /v1/conversations/:id/messages

┌─────────────────┐
│ MongoDB │ 1. Create Message document
└────────┬────────┘ 2. Update Conversation.last_activity
│ 3. Increment unread_count for other users

┌─────────────────┐
│ Internal API │ Axios POST request
└────────┬────────┘
│ POST /emit/{userId}/new_message

┌─────────────────┐
│ General Socket │ 1. Receive REST request
│ (Port 4000) │ 2. Query Message + populate users
└────────┬────────┘ 3. Find all socket IDs for users

├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ User 1 Socket │ │ User 2 Socket │
│ (Browser) │ │ (Browser) │
└─────────────────┘ └─────────────────┘
emit('new_message', data) emit('new_message', data)
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - Show message │ │ - Show message │
│ - Play sound │ │ - Show badge │
└─────────────────┘ └─────────────────┘

Steps:

  1. User Action: User clicks Send in conversation UI
  2. API Call: Frontend calls POST /v1/conversations/{id}/messages
  3. Database: Internal API creates Message, updates Conversation
  4. Socket Trigger: Internal API calls General Socket REST endpoint
  5. Socket Query: General Socket fetches message + users from DB
  6. Socket Emission: For each user, emit to all their connected sockets
  7. Client Update: Each client receives event and updates UI

Prospect Communication Flow

Flow: External Webhook → General Socket → Dashboard Users

Incoming SMS (Twilio)

┌─────────────────┐
│ Customer Phone │ Sends SMS
└────────┬────────┘


┌─────────────────┐
│ Twilio API │ Receives SMS
└────────┬────────┘
│ POST /webhook/twilio/sms

┌─────────────────┐
│ External API │ 1. Parse SMS payload
│ (Port 5003) │ 2. Find Contact by phone
└────────┬────────┘ 3. Create Communication (type: INCOMING)
│ 4. Associate with ConversationProspect

┌─────────────────┐
│ MongoDB │ Insert Communication document
└────────┬────────┘


┌─────────────────┐
│ External API │ Axios GET request
└────────┬────────┘
│ GET /conversations/prospect/emit/message/{messageID}

┌─────────────────┐
│ General Socket │ 1. Fetch Communication + populate sent_by
│ (Port 4000) │ 2. For each conversation_id, get users
└────────┬────────┘ 3. Query all socket IDs

├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Agent 1 Socket │ │ Agent 2 Socket │
│ (Dashboard) │ │ (Dashboard) │
└─────────────────┘ └─────────────────┘
emit('prospect_message', { emit('prospect_message', {
type: 'INCOMING', type: 'INCOMING',
data: 'SMS text', data: 'SMS text',
sent_by: Contact, sent_by: Contact,
is_sender: false is_sender: false
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - New message │ │ - New message │
│ - Notification │ │ - Notification │
│ - Update badge │ │ - Update badge │
└─────────────────┘ └─────────────────┘

Outgoing SMS (Agent Reply)

┌─────────────────┐
│ Agent Dashboard │ Clicks Send Reply
└────────┬────────┘
│ POST /v1/prospects/conversations/{id}/reply

┌─────────────────┐
│ Internal API │ 1. Create Communication (type: OUTGOING)
│ (Port 5002) │ 2. Get Contact phone + Twilio config
└────────┬────────┘ 3. Call Twilio API to send SMS
│ 4. Associate with ConversationProspect

┌─────────────────┐
│ Twilio API │ Send SMS to customer
└────────┬────────┘
│ (Async - delivery status webhook later)

┌─────────────────┐
│ MongoDB │ Insert Communication document
└────────┬────────┘


┌─────────────────┐
│ Internal API │ Axios GET request
└────────┬────────┘
│ GET /conversations/prospect/emit/message/{messageID}

┌─────────────────┐
│ General Socket │ 1. Fetch Communication + populate sent_by (User)
│ (Port 4000) │ 2. Get all users in conversation
└────────┬────────┘ 3. Query socket IDs

├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Agent 1 Socket │ │ Agent 2 Socket │
│ (Sender) │ │ (Other User) │
└─────────────────┘ └─────────────────┘
emit('prospect_message', { emit('prospect_message', {
type: 'OUTGOING', type: 'OUTGOING',
data: 'Reply text', data: 'Reply text',
sent_by: User, sent_by: User,
is_sender: true is_sender: false
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ - Show sent msg│ │ - Show message │
│ - Clear input │ │ - Update time │
└─────────────────┘ └─────────────────┘

Lead Scraper Task Flow

Flow: Internal API → General Socket → Python Scraper → General Socket → Internal API

┌─────────────────┐
│ Dashboard │ User creates Lead Finder campaign
└────────┬────────┘
│ POST /v1/lead-finder/campaigns

┌─────────────────┐
│ Internal API │ 1. Create Campaign document
│ (Port 5002) │ 2. Generate scrape tasks
└────────┬────────┘ 3. Get available scraper socket IDs


┌─────────────────┐
│ General Socket │ Query /leadfinder namespace
│ Scraper Pool │ - Return socket IDs of connected scrapers
└────────┬────────┘ - Load balance across scrapers

↓ (Has 3 scrapers connected)
┌─────────────────────────────────────────────┐
│ Load Balancing Strategy │
│ - Round-robin across socket IDs │
│ - Track active tasks per scraper │
│ - Distribute evenly │
└────────┬────────────────────────────────────┘

├─────────────────┬─────────────────┐
↓ ↓ ↓
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Scraper 1 │ │ Scraper 2 │ │ Scraper 3 │
│ (Python) │ │ (Python) │ │ (Python) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ POST /lead-finder/scrape_task │
↓ ↓ ↓
emit('scrape_task', { (receives (receives
task_id: '001', task 002) task 003)
keyword: 'plumber',
location: 'NYC'
})


┌─────────────────┐
│ Python Scraper │ 1. Parse task payload
│ Process │ 2. Execute Google Maps scraping
└────────┬────────┘ 3. Extract business data
│ 4. Return results

┌─────────────────┐
│ Scraper Results │ Axios POST request
└────────┬────────┘
│ POST /v1/lead-finder/tasks/{task_id}/results

┌─────────────────┐
│ Internal API │ 1. Validate results
│ (Port 5002) │ 2. Store leads in MongoDB
└────────┬────────┘ 3. Update task status
│ 4. Notify user via socket

┌─────────────────┐
│ General Socket │ Axios POST request
└────────┬────────┘
│ POST /emit/{userId}/scrape_complete

┌─────────────────┐
│ Dashboard User │ emit('scrape_complete', {
│ (Browser) │ task_id: '001',
└─────────────────┘ leads_found: 42


┌─────────────────┐
│ UI Update │ - Show notification
│ │ - Refresh leads table
└─────────────────┘ - Update campaign stats

Key Points:

  • No Authentication: Scraper namespace requires no JWT
  • Load Balancing: Tasks distributed across available scrapers
  • Bi-directional: Internal API → Scraper and Scraper → Internal API
  • Scalable: Add more scrapers by simply connecting more sockets

Typing Indicator Flow

Real-time User Presence Indication

┌─────────────────┐
│ User 1 Browser │ User starts typing
└────────┬────────┘
│ keydown event on message input

┌─────────────────┐
│ Frontend Logic │ Emit after 500ms debounce
└────────┬────────┘
│ socket.emit('convo_someone_typing', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ userId: '64user1...'
│ })

┌─────────────────┐
│ General Socket │ 1. Receive event on /v1/conversation namespace
│ (Port 4000) │ 2. Validate user is in conversation
└────────┬────────┘ 3. Get other users' socket IDs
│ 4. Emit to other users only

├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ User 2 Socket │ │ User 3 Socket │
└─────────────────┘ └─────────────────┘
emit('convo_someone_typing', { emit('convo_someone_typing', {
userId: '64user1...', userId: '64user1...',
userName: 'John Doe', userName: 'John Doe',
conversationId: '64conv...' conversationId: '64conv...'
}) })
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ "John is │ │ "John is │
│ typing..." │ │ typing..." │
└─────────────────┘ └─────────────────┘

(After 3 seconds of no typing)

┌─────────────────┐
│ User 1 Browser │ User stops typing
└────────┬────────┘
│ socket.emit('convo_stopped_typing', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...'
│ })

┌─────────────────┐
│ General Socket │ Emit to other users
└────────┬────────┘


┌─────────────────┐ ┌─────────────────┐
│ User 2 Socket │ │ User 3 Socket │
└─────────────────┘ └─────────────────┘
emit('convo_stopped_typing') emit('convo_stopped_typing')
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ UI Update │ │ UI Update │
│ Remove │ │ Remove │
│ "typing..." │ │ "typing..." │
└─────────────────┘ └─────────────────┘

Optimization:

  • Frontend debounces typing events (500ms)
  • Auto-clear typing indicator after 3 seconds
  • Only emits to users in same conversation
  • Excludes sender from receiving own typing events

Read Receipt Flow

Track Message Read Status

┌─────────────────┐
│ User 2 Browser │ User opens conversation
└────────┬────────┘
│ Component mount / visibility change

┌─────────────────┐
│ Frontend Logic │ Mark unread messages as read
└────────┬────────┘
│ socket.emit('convo_mark_conversation_read', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ messageIds: ['64msg1...', '64msg2...']
│ })

┌─────────────────┐
│ General Socket │ 1. Update Message.read_by array
│ (Port 4000) │ 2. Decrement Conversation.unread_count for user
└────────┬────────┘ 3. Get sender's socket IDs
│ 4. Emit read receipt to sender

┌─────────────────┐
│ MongoDB │ Message.updateMany({
└─────────────────┘ _id: { $in: messageIds },
$addToSet: { read_by: userId }
})

┌─────────────────┐
│ User 1 Socket │ (Original sender)
└─────────────────┘
emit('convo_message_read', {
messageIds: ['64msg1...', '64msg2...'],
readBy: {
_id: '64user2...',
name: 'Jane Doe'
},
timestamp: '2025-10-13T10:35:00.000Z'
})


┌─────────────────┐
│ UI Update │ - Show checkmarks
│ │ - Update message status
└─────────────────┘ - "Read by Jane Doe"

Snooze Conversation Flow

Temporarily Hide Conversation

┌─────────────────┐
│ User Dashboard │ Clicks Snooze (1 hour)
└────────┬────────┘
│ socket.emit('convo_snooze_conversation', {
│ conversationId: '64conv...',
│ roomId: 'room_64conv...',
│ snoozeUntil: 1697200000000 // Unix timestamp
│ })

┌─────────────────┐
│ General Socket │ 1. Find ConversationSnooze for user
│ (Port 4000) │ 2. Update snoozeUntil + is_snoozed flag
└────────┬────────┘ 3. Emit confirmation back to user


┌─────────────────┐
│ MongoDB │ ConversationSnooze.updateOne({
└─────────────────┘ user: userId,
conversation: conversationId
}, {
snoozeUntil: Date,
is_snoozed: true
})

┌─────────────────┐
│ User Socket │ emit('convo_snooze_conversation', {
└─────────────────┘ conversationId: '64conv...',
│ snoozeUntil: 1697200000000,
│ is_snoozed: true
↓ })
┌─────────────────┐
│ UI Update │ - Hide conversation from list
│ │ - Show in "Snoozed" tab
└─────────────────┘ - Set timer to un-snooze

(After snooze period expires)

┌─────────────────┐
│ Cron Job │ Queue Manager checks snoozeUntil
│ (Queue Mgr) │ For expired snoozes, call REST API
└────────┬────────┘
│ POST /emit/{userId}/conversation_unsnoozed

┌─────────────────┐
│ General Socket │ Update ConversationSnooze.is_snoozed = false
└────────┬────────┘ Emit to user's sockets


┌─────────────────┐
│ User Socket │ emit('conversation_unsnoozed', {
└─────────────────┘ conversationId: '64conv...'
│ })

┌─────────────────┐
│ UI Update │ - Show conversation in main list
│ │ - Remove from "Snoozed" tab
└─────────────────┘ - Show notification if new messages

Multi-Device Synchronization

User Logged in on Desktop + Mobile

┌─────────────────┐
│ Desktop Client │ User sends message
└────────┬────────┘
│ POST /v1/conversations/{id}/messages

┌─────────────────┐
│ Internal API │ Create message → Trigger socket emission
└────────┬────────┘


┌─────────────────┐
│ General Socket │ Query all socket IDs for User 1
│ User 1 has 2 │ - Desktop: socketId_desktop_abc
│ active sockets │ - Mobile: socketId_mobile_xyz
└────────┬────────┘

├─────────────────────────────────────┐
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Desktop Socket │ │ Mobile Socket │
│ (Same User) │ │ (Same User) │
└─────────────────┘ └─────────────────┘
emit('new_message', data) emit('new_message', data)
│ │
↓ ↓
┌─────────────────┐ ┌─────────────────┐
│ Desktop UI │ │ Mobile UI │
│ - Show message │ │ - Show message │
│ - Clear input │ │ - Push notif │
└─────────────────┘ └─────────────────┘

Key: Both devices receive the same event for real-time sync across devices.


Error Recovery Flow

Socket Disconnection Handling

┌─────────────────┐
│ User Browser │ Network interruption
└────────┬────────┘
│ (Connection lost)

┌─────────────────┐
│ Socket.IO Client│ Auto-reconnect attempt (exponential backoff)
└────────┬────────┘ - Retry after 1s, 2s, 4s, 8s...

↓ (Connection restored)
┌─────────────────┐
│ General Socket │ 'connection' event fired
└────────┬────────┘
│ 1. Authenticate with JWT
↓ 2. Get user's active conversations
┌─────────────────┐
│ is-auth.js │ Verify token + extract userId
└────────┬────────┘


┌─────────────────┐
│ Frontend Logic │ Re-join all active rooms
└────────┬────────┘
│ socket.emit('convo_join', {
│ conversationId: '64conv1...',
│ roomId: 'room_64conv1...'
│ })
│ socket.emit('convo_join', {
│ conversationId: '64conv2...',
│ roomId: 'room_64conv2...'
│ })

┌─────────────────┐
│ General Socket │ Join socket to rooms
└────────┬────────┘ - room_64conv1...
│ - room_64conv2...

┌─────────────────┐
│ Frontend API │ Fetch missed messages
└────────┬────────┘ GET /v1/conversations/{id}/messages?since={lastTimestamp}


┌─────────────────┐
│ UI Update │ - Merge missed messages
│ │ - Show "Connection restored"
└─────────────────┘ - Resume normal operation

Socket.IO Built-in Features:

  • Automatic reconnection with exponential backoff
  • Buffering of emitted events during disconnection
  • connect, disconnect, reconnect events for handling

System-Wide Broadcast Flow

Example: Scheduled Maintenance Announcement

┌─────────────────┐
│ Admin Panel │ Admin schedules maintenance
└────────┬────────┘
│ POST /v1/admin/announcements

┌─────────────────┐
│ Internal API │ Create Announcement document
└────────┬────────┘
│ Axios POST /emit/system_announcement

┌─────────────────┐
│ General Socket │ Query all active sessions
│ (Port 4000) │ - Get all connected socket IDs
└────────┬────────┘ - Broadcast to everyone

├─────────┬─────────┬─────────┬─────────┐
↓ ↓ ↓ ↓ ↓
[User 1] [User 2] [User 3] [User 4] [User 5]
│ │ │ │ │
└─────────┴─────────┴─────────┴─────────┘


emit('system_announcement', {
title: 'Scheduled Maintenance',
message: 'System down Monday 2-4AM',
priority: 'high',
action_url: '/maintenance-info'
})


┌─────────────────────────────────────────────────┐
│ All Connected Clients │
│ - Show modal/banner │
│ - Play notification sound │
│ - Store in localStorage for offline viewing │
└─────────────────────────────────────────────────┘

Data Flow Summary

Flow TypeInitiatorTrigger MethodSocket EventTarget Users
Conversation MessageInternal APIRESTnew_messageConversation
Prospect Incoming SMSExternal APIWebhookprospect_messageConversation
Prospect Outgoing ReplyInternal APIUser Actionprospect_messageConversation
Lead Scraper TaskInternal APICampaignscrape_taskSpecific Socket
Typing IndicatorClient BrowserUser Typingconvo_someone_typingOthers in Convo
Read ReceiptClient BrowserView Messageconvo_message_readMessage Sender
Snooze ConversationClient BrowserUser Actionconvo_snoozeSame User
System AnnouncementInternal APIAdmin Actionsystem_announcementAll Users
Multi-Device SyncAny ServiceREST/SocketVariousSame User

Best Practices

For Service Integration

  1. Always use REST API for triggering socket emissions from other services
  2. Never query sockets directly from other services (use General Socket REST endpoints)
  3. Validate data before emitting to prevent invalid socket events
  4. Handle errors gracefully with fallback mechanisms

For Frontend Integration

  1. Reconnect logic: Implement reconnection with exponential backoff
  2. Room management: Join rooms immediately after connection
  3. Message buffering: Queue messages during disconnection
  4. Optimistic UI: Update UI immediately, rollback on error
  5. Event debouncing: Debounce typing indicators and scroll events

For Scaling

  1. Use Redis adapter for multi-instance horizontal scaling
  2. Implement load balancing for scraper task distribution
  3. Monitor socket connections to detect bottlenecks
  4. Cache user-socket mappings to reduce DB queries
  5. Set socket timeouts to clean up stale connections
💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM