LeadFinder Integration
Namespace: /v1/lead-finder
Purpose: Lead scraper connection tracking and task distribution
Authentication: None required
Socket Events
Client → Server Events
| Event | Description | Authentication | Scope Required |
|---|---|---|---|
scraper_connect | Register scraper connection | None | None |
disconnect | Scraper disconnected (automatic) | N/A | N/A |
Database Models
- LeadFinderScraper - Scraper connection tracking
Schema:
{
type: String, // Scraper type (e.g., 'google', 'yelp', 'bing')
name: String, // Scraper instance name
socketId: String, // Current socket connection ID
createdAt: Date,
updatedAt: Date
}
Event Details
scraper_connect
Purpose: Register external scraper with socket connection
Request:
socket.emit('scraper_connect', {
type: 'google', // Scraper type identifier
name: 'google-scraper-1', // Unique scraper name
});
No Response: Fire-and-forget event
Side Effects:
- Creates or updates LeadFinderScraper document
- Sets
socketIdto current connection ID - Uses
{ type, name }as unique identifier (upsert)
Example:
// Scraper connects
await ScraperModel.updateOne(
{ type: 'google', name: 'google-scraper-1' },
{ socketId: 'xyz123' },
{ upsert: true },
);
disconnect
Purpose: Handle scraper disconnection (automatic)
Trigger: Automatic when socket disconnects
Side Effects:
- Connection removed from socket pool
- ScraperModel record remains (socketId becomes stale)
- Next
scraper_connectupdates socketId
REST API Integration
The LeadFinder namespace is controlled via REST API endpoints:
Emit Custom Event to Scraper
Endpoint: POST /lead-finder/:event
Purpose: Send task or command to specific scraper
Request:
POST /lead-finder/scrape_task
Content-Type: application/json
{
"socketId": "xyz123",
"payload": {
"task_id": "task_001",
"keyword": "plumber near me",
"location": "New York, NY",
"max_results": 50
}
}
Response:
{ success: true, message: 'SUCCESS' }
Controller Logic:
exports.emit = async (req, res, next) => {
const io = req.io;
const event = req.params.event; // 'scrape_task'
const socketId = req.body.socketId; // 'xyz123'
const payload = req.body.payload;
io.to(socketId).emit(event, payload); // Emit to specific scraper
return res.status(200).json({ success: true, message: 'SUCCESS' });
};
Use Cases
1. Lead Scraping Task Distribution
Flow:
1. User initiates lead scraping from Internal API
POST /v1/lead-finder/scrape
body: { keyword, location, type: 'google' }
↓
2. Internal API finds available scraper:
query LeadFinderScraper where type = 'google'
↓
3. Internal API calls General Socket REST API:
POST /lead-finder/scrape_task
body: { socketId, payload: { task_id, keyword, location } }
↓
4. General Socket emits 'scrape_task' to scraper socket
↓
5. Scraper receives task and starts processing
↓
6. Scraper sends results back via webhook or socket
POST /v1/lead-finder/results
2. Scraper Health Check
Flow:
1. Cron job queries all scrapers
LeadFinderScraper.find({})
↓
2. For each scraper, emit 'health_check' event:
POST /lead-finder/health_check
body: { socketId: scraper.socketId }
↓
3. Scraper responds with status:
socket.emit('health_status', { status: 'active', tasks: 3 })
↓
4. Internal API updates scraper availability
3. Scraper Load Balancing
Internal API Logic:
// Find available scraper with least tasks
const scrapers = await LeadFinderScraper.find({ type: 'google' });
const availableScraper = scrapers.find(s => s.socketId && s.active);
// Emit task to scraper
await axios.post('http://general-socket:4000/lead-finder/scrape_task', {
socketId: availableScraper.socketId,
payload: { task_id, keyword, location },
});
Client Example (Scraper)
import io from 'socket.io-client';
const socket = io('http://localhost:4000/v1/lead-finder', {
transports: ['websocket'],
// No authentication required
});
// Register scraper
socket.on('connect', () => {
console.log('Connected to LeadFinder namespace');
socket.emit('scraper_connect', {
type: 'google',
name: 'google-scraper-1',
});
});
// Listen for scraping tasks
socket.on('scrape_task', async payload => {
console.log('Received scrape task:', payload);
const { task_id, keyword, location, max_results } = payload;
try {
// Perform scraping
const results = await scrapGoogle(keyword, location, max_results);
// Send results back to Internal API
await axios.post('http://internal-api:5002/v1/lead-finder/results', {
task_id,
results,
status: 'completed',
});
console.log(`Task ${task_id} completed`);
} catch (error) {
console.error(`Task ${task_id} failed:`, error);
// Report error
await axios.post('http://internal-api:5002/v1/lead-finder/results', {
task_id,
status: 'failed',
error: error.message,
});
}
});
// Handle health checks
socket.on('health_check', () => {
socket.emit('health_status', {
status: 'active',
tasks: currentTaskCount,
uptime: process.uptime(),
});
});
// Handle disconnection
socket.on('disconnect', () => {
console.log('Disconnected from LeadFinder namespace');
});
Scraper Types
Common scraper types registered:
google- Google Maps/Search lead scrapingyelp- Yelp business listingsbing- Bing Places scrapingyellowpages- Yellow Pages directoryfacebook- Facebook business pageslinkedin- LinkedIn company profiles
Security Considerations
- No Authentication: Anyone can connect to
/v1/lead-findernamespace - IP Whitelisting: Recommended for production (only allow scraper IPs)
- Rate Limiting: Implement per-socket rate limits
- Task Validation: Validate task payload before emitting
- Scraper Verification: Verify scraper identity through additional mechanism
Monitoring
Key Metrics:
- Active scraper count
- Tasks per scraper
- Task completion rate
- Average task duration
- Socket connection/disconnection rate
Recommended Monitoring:
// Count active scrapers
const activeScrapers = await LeadFinderScraper.countDocuments({
socketId: { $exists: true, $ne: null },
});
// Check scraper uptime
io.of('/v1/lead-finder').sockets.forEach(socket => {
console.log('Active scraper socket:', socket.id);
});