📦 Queue Processor
📖 Overview
The Queue Processor is the core delivery engine for the Notifications Service. It's responsible for polling pending notifications, managing retry logic, and orchestrating delivery through external APIs.
Environment Flag: QUEUE_ENABLED=true (REQUIRED)
Trigger Type: Cron Job (every 5 seconds)
Technology: Bull Queue + Redis
Location: notifications/services/queue/queue.js
This module MUST be enabled for ANY notifications to be delivered. Without this, notifications are created in the queue but never processed.
🏗️ Architecture
System Flow
sequenceDiagram
participant NQ as NotificationQueue<br/>(MongoDB)
participant CRON as Cron Job<br/>(Every 5s)
participant BULL as Bull Queue<br/>(Redis)
participant EXT as External APIs<br/>(SendGrid/Twilio/etc)
participant COMM as Communication<br/>(Logs)
loop Every 5 seconds
CRON->>NQ: Query pending notifications
Note over NQ: Find pending items with<br/>tries < MAX_RETRIES
NQ-->>CRON: Return notifications
CRON->>BULL: Add jobs to queue
Note over BULL: With retry config
BULL->>BULL: Process job
BULL->>EXT: Send notification
alt Success
EXT-->>BULL: Success response
BULL->>NQ: Delete notification
BULL->>COMM: Save communication log
else Failure
EXT-->>BULL: Error response
BULL->>NQ: Increment tries, store error
Note over BULL: Exponential backoff<br/>for retry
end
end
⚙️ Configuration
Environment Variables
# Required
QUEUE_ENABLED=true # Enable queue processor
REDIS_HOST=localhost # Redis host
REDIS_PORT=6379 # Redis port
# SendGrid
SENDGRID_API_KEY=your_key # Master SendGrid key
# Twilio
TWILIO_ACCOUNT_SID=your_sid # Master Twilio account
TWILIO_AUTH_TOKEN=your_token
TWILIO_MESSAGING_GROUP_SID=sid # For internal messages
TWILIO_WEBHOOK_URL=url # Status callback URL
TWILIO_SMS_URL=url # SMS webhook URL
# Firebase
GOOGLE_APPLICATION_CREDENTIALS=path # Firebase credentials file
FIREBASE_PROJECT_ID=your_project
NOTIFICATION_ICON_URL=url # Icon for web notifications
# General Socket (for bell notifications)
GENERAL_SOCKET=http://localhost:4000
Bull Queue Configuration
const Queue = require('bull')(
'NotificationQueue',
`redis://${process.env.REDIS_HOST}:${process.env.REDIS_PORT}`,
);
const MAX_RETRIES = 10;
const BACKOFF_DELAY = 10000; // 10 seconds
🔄 Processing Logic
Job Types
The queue processor handles four notification types:
1. Email Processing
case 'email':
// 1. Create sender payload (business info)
let sender = await createSender({
accountId: job.data.sender_account,
internal: job.data.internal_sender
});
// 2. Generate SendGrid payload
let notificationData = generateSendgridPayload({
sender,
user: job.data.recipient,
template_id: job.data.content.template_id,
addtl_data: job.data.content.additional_data,
content: job.data.content.body,
subject: job.data.content.subject
});
// 3. Check DND list
const dnd = await DND.find({
account_id: job.data.sender_account,
value: { $in: [job.data.recipient.email] }
});
if (dnd?.length) {
// Skip notification - user opted out
await QueueModel.deleteOne({ _id: job.id });
return done();
}
// 4. Send email
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });
const email_response = await NotificationUtil.sendEmail(
notificationData,
accountId
);
// 5. Save communication log
await saveEmailCommunication(
com_data,
email_response.headers,
notificationData
);
});
done();
Key Steps:
- Fetch sender business information
- Generate SendGrid-compliant payload
- Check DND (Do Not Disturb) list
- Send via SendGrid API
- Save communication log
- Delete from queue
2. SMS Processing
case 'sms':
// 1. Fetch Twilio credentials
let twilio_account = await findTwilioAccount({
accountId: job.data.sender_account
});
// 2. Generate Twilio payload
let notificationData = generateTwilioPayload({
twilio_account,
data: job.data.data,
user: job.data.recipient,
account_id: job.data.sender_account,
user_id: job.data.sender_user,
use_credits: job.data.check_credits
});
// 3. Check DND list
const dnd = await DND.find({
account_id: job.data.sender_account,
value: { $in: [job.data.recipient.phone] }
});
if (dnd?.length) {
await QueueModel.deleteOne({ _id: job.id });
return done();
}
// 4. Send SMS
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });
const sms_response = await NotificationUtil.sendSMS(
notificationData,
true
);
// 5. Save communication log
await saveSMSCommunication(
com_data,
sms_response,
notificationData
);
});
done();
Key Steps:
- Fetch account's Twilio credentials
- Generate Twilio-compliant payload
- Check DND list
- Send via Twilio API
- Save communication log
- Delete from queue
3. FCM Processing
case 'fcm':
let notification = {
title: job.data.content.title,
body: job.data.content.body,
click_action: job.data.content.click_action,
data: job.data.content.data ?? {},
module: job.data.content.module,
type: job.data.content.type
};
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });
await NotificationUtil.sendFCM(
[job.data.sender_account.toString()],
[job.data.recipient.user_id.toString()],
notification,
{}
);
});
done();
Key Steps:
- Build notification object
- Send to Firebase Cloud Messaging
- Delete from queue
4. Expo Push Processing
case 'expo-push':
// 1. Generate payload with user tokens
const notificationData = await generateExpoPayload(job.data);
const notif_ids = job.data?._id;
// 2. Check if any valid tokens exist
if (!notificationData.length && notif_ids) {
// No valid tokens - clean up
await QueueModel.deleteOne({ _id: notif_ids });
return done();
}
// 3. Send notifications
if (notificationData.length) {
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: notif_ids }, { session });
await NotificationUtil.sendExpoPushNotification(
notificationData,
true
);
});
}
done();
Key Steps:
- Fetch user's Expo push tokens
- Generate Expo-compliant payload
- Validate tokens (remove invalid)
- Send via Expo Push API
- Delete from queue
Polling Logic
The queue processor polls MongoDB every 5 seconds:
const processQueuedList = async () => {
// 1. Fetch active notification types
let notificationModules = await Config.findOne({
type: 'notification-modules',
})
.lean()
.exec();
// 2. Query pending notifications
let pendingNotifications = await QueueModel.find({
type: { $in: notificationModules.active },
$or: [{ tries: { $lt: MAX_RETRIES } }, { tries: { $exists: 0 } }],
})
.lean()
.exec();
// 3. Separate Expo notifications (batch processing)
const expoNotifs = pendingNotifications.filter(notif => notif.type === 'expo-push');
pendingNotifications = pendingNotifications.filter(notif => notif.type !== 'expo-push');
// 4. Add to Bull queue
if (pendingNotifications.length) {
for (let notification of pendingNotifications) {
// Check for special review request configuration
let delay = 0;
let repeat;
if (notification.origin === 'reviews') {
const autoReviewRequest = await AutoReviewRequest.findOne({
account_id: notification.sender_account,
});
if (autoReviewRequest) {
delay = autoReviewRequest.delay * 1000;
if (!autoReviewRequest.send_only_once) {
repeat = {
every: autoReviewRequest.reminder * 24 * 60 * 60 * 1000,
limit: autoReviewRequest.retry || 1,
};
}
}
}
const queueOptions = {
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: BACKOFF_DELAY,
},
delay,
};
// Add immediate job
Queue.add({ ...notification }, queueOptions);
// Add repeatable job if configured
if (repeat) {
queueOptions.repeat = repeat;
Queue.add({ ...notification }, queueOptions);
}
}
}
// 5. Process Expo notifications
if (expoNotifs.length) {
for (let notification of expoNotifs) {
Queue.add(
{ ...notification },
{
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: BACKOFF_DELAY,
},
},
);
}
}
};
// Start cron job
let inProgress = false;
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await processQueuedList();
inProgress = false;
}
});
🚨 Error Handling
Retry Strategy
Exponential Backoff:
attempts: 10
backoff: {
type: 'exponential',
delay: 10000 // Base delay
}
// Resulting delays:
Attempt 1: 10s
Attempt 2: 20s
Attempt 3: 40s
Attempt 4: 80s (~1.3 min)
Attempt 5: 160s (~2.7 min)
Attempt 6: 320s (~5.3 min)
Attempt 7: 640s (~10.7 min)
Attempt 8: 1280s (~21.3 min)
Attempt 9: 2560s (~42.7 min)
Attempt 10: 5120s (~85.3 min)
Total time: ~2.7 hours
Failure Tracking
On each failed attempt:
Queue.on('failed', async (job, err) => {
let message = err.message || err;
if (err.isAxiosError) {
message = JSON.stringify(err.response.data);
}
// Update notification with error info
await QueueModel.updateOne(
{ _id: job.id },
{
$inc: { tries: 1 },
last_error: message,
},
);
// After max retries, create failed communication log
if (job.attemptsMade >= job.opts.attempts) {
if (['sms', 'email'].includes(job.data.type)) {
if (job.data.type === 'email') {
await saveEmailCommunication(
{ ...com_data, success: false },
{},
job.data.notificationData || {},
);
} else if (job.data.type === 'sms') {
await saveSMSCommunication(
{ ...com_data, success: false },
{},
job.data.notificationData || {},
);
}
}
}
// Log error
logger.error({
initiator: 'notification/queue',
error: err,
message: err.response?.data || err.message,
job: job.id,
job_data: job.data,
});
});
Success Handling
On successful delivery:
Queue.on('completed', async job => {
logger.log({
initiator: 'notification/queue',
message: 'Notification Processed',
job: job.id,
job_data: job.data,
});
// Update contact's last_contacted date
if (['email', 'sms'].includes(job.data.type) && job.data.comm_data) {
try {
await Contact.updateOne(
{ _id: job.data.comm_data.contact_id },
{ last_contacted: job.data.comm_data.created_at },
);
} catch (err) {
logger.error({
initiator: 'notification/queue',
message: 'Failed to update last contacted date',
job: job.id,
error: err,
});
}
}
});
🔍 Special Features
Review Request Repeating Notifications
The queue processor supports repeating notifications for review requests:
// Configuration from AutoReviewRequest
{
send_only_once: false, // Allow repeats
delay: 1, // Days to delay initial send
reminder: 7, // Days between reminders
retry: 3 // Number of reminders
}
// Results in:
// - Initial notification delayed by 1 day
// - Reminder 1: 7 days after initial
// - Reminder 2: 14 days after initial
// - Reminder 3: 21 days after initial
Implementation:
const queueOptions = {
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: { type: 'exponential', delay: BACKOFF_DELAY },
delay: 1 * 24 * 60 * 60 * 1000, // 1 day
repeat: {
every: 7 * 24 * 60 * 60 * 1000, // 7 days
limit: 3, // 3 reminders
},
};
Queue.add({ ...notification }, queueOptions);
DND List Handling
Before sending email or SMS, the processor checks the DND list:
const dnd = await DND.find({
account_id: new mongoose.Types.ObjectId(job.data.sender_account),
value: { $in: [job.data.recipient.email] }, // or phone
});
if (dnd?.length) {
logger.error({
initiator: 'notification/queue',
message: 'DND enabled for received email(s).',
job: job.id,
job_data: job.data,
});
// For repeatable jobs, just skip this attempt
if (job.id.startsWith('repeat')) {
return done();
}
// Otherwise, delete from queue
await QueueModel.deleteOne({ _id: job.id });
return done();
}
📊 Performance Considerations
Polling Frequency
Current: Every 5 seconds
Considerations:
- Lower frequency (10s): Reduces DB load but increases notification delay
- Higher frequency (3s): Faster delivery but increases DB load
- Optimal: 5s balances latency and resource usage
Queue Prioritization
Currently, all notifications have equal priority. Consider implementing:
// High priority (immediate)
const criticalNotifications = ['instareports_failed', 'instasites_failed', 'support_notifications'];
// Medium priority (5s delay)
const standardNotifications = ['contacts_assigned', 'deals_assigned', 'forms_submitted'];
// Low priority (30s delay)
const marketingNotifications = ['review_requests', 'funnel_notifications'];
Batch Processing
Expo notifications are already batched. Consider batching other types:
// Batch emails by account for SendGrid API efficiency
const emailsByAccount = groupBy(emails, 'sender_account');
for (const [accountId, emails] of Object.entries(emailsByAccount)) {
// Send all emails for account in single API call
await sendBatchEmails(accountId, emails);
}
🐛 Troubleshooting
Issue: Queue Not Processing
Symptoms: Notifications stuck in MongoDB, not delivered
Check:
-
Environment flag:
echo $QUEUE_ENABLED
# Should output: true -
Service running:
grep "QUEUE SERVICE RUNNING" notifications.log -
Redis connection:
redis-cli -h $REDIS_HOST -p $REDIS_PORT ping
# Should return: PONG -
Bull queue health:
// In Node.js console
const Queue = require('bull')('NotificationQueue', 'redis://...');
await Queue.getJobCounts();
// Should show: { waiting, active, completed, failed }
Issue: High Failure Rate
Symptoms: Many jobs reaching MAX_RETRIES
Common Causes:
-
SendGrid API errors:
// Check SendGrid key validity
const key = await SendgridKey.findOne({ account_id });
// Test key with API call -
Twilio API errors:
// Verify credentials
const account = await Account.findById(id, { twilio_account: 1 }); -
Invalid recipients:
// Check email format
// Check phone format (E.164)
Issue: Slow Processing
Symptoms: Notifications delayed beyond poll interval
Diagnosis:
// Check queue size
db.getCollection('notifications.queue').count();
// Check Bull queue backlog
await Queue.getJobCounts();
// Check for long-running jobs
const active = await Queue.getActive();
// Look for jobs stuck in active state
Solutions:
- Scale horizontally (add more notification service instances)
- Increase Redis memory
- Optimize external API calls
- Implement queue prioritization
📈 Metrics to Monitor
Queue Health:
- Pending notification count (should trend toward 0)
- Average time in queue (should be < 10 seconds)
- Retry rate (should be < 5%)
Delivery Rates:
- Success rate per notification type (target > 95%)
- DND blocking rate
- External API error rate
Performance:
- Poll duration (should be < 1 second)
- Bull queue processing time
- External API response time
Module Type: Cron Job (every 5 seconds)
Environment Flag: QUEUE_ENABLED=true (REQUIRED)
Dependencies: MongoDB, Redis, SendGrid, Twilio, Firebase, Expo
Status: Core - required for all notification delivery