Skip to main content

📦 Queue Processor

📖 Overview

The Queue Processor is the core delivery engine for the Notifications Service. It's responsible for polling pending notifications, managing retry logic, and orchestrating delivery through external APIs.

Environment Flag: QUEUE_ENABLED=true (REQUIRED) Trigger Type: Cron Job (every 5 seconds) Technology: Bull Queue + Redis Location: notifications/services/queue/queue.js

Critical Service

This module MUST be enabled for ANY notifications to be delivered. Without this, notifications are created in the queue but never processed.

🏗️ Architecture

System Flow

sequenceDiagram
participant NQ as NotificationQueue<br/>(MongoDB)
participant CRON as Cron Job<br/>(Every 5s)
participant BULL as Bull Queue<br/>(Redis)
participant EXT as External APIs<br/>(SendGrid/Twilio/etc)
participant COMM as Communication<br/>(Logs)

loop Every 5 seconds
CRON->>NQ: Query pending notifications
Note over NQ: Find pending items with<br/>tries < MAX_RETRIES

NQ-->>CRON: Return notifications

CRON->>BULL: Add jobs to queue
Note over BULL: With retry config

BULL->>BULL: Process job
BULL->>EXT: Send notification

alt Success
EXT-->>BULL: Success response
BULL->>NQ: Delete notification
BULL->>COMM: Save communication log
else Failure
EXT-->>BULL: Error response
BULL->>NQ: Increment tries, store error
Note over BULL: Exponential backoff<br/>for retry
end
end

⚙️ Configuration

Environment Variables

# Required
QUEUE_ENABLED=true # Enable queue processor
REDIS_HOST=localhost # Redis host
REDIS_PORT=6379 # Redis port

# SendGrid
SENDGRID_API_KEY=your_key # Master SendGrid key

# Twilio
TWILIO_ACCOUNT_SID=your_sid # Master Twilio account
TWILIO_AUTH_TOKEN=your_token
TWILIO_MESSAGING_GROUP_SID=sid # For internal messages
TWILIO_WEBHOOK_URL=url # Status callback URL
TWILIO_SMS_URL=url # SMS webhook URL

# Firebase
GOOGLE_APPLICATION_CREDENTIALS=path # Firebase credentials file
FIREBASE_PROJECT_ID=your_project
NOTIFICATION_ICON_URL=url # Icon for web notifications

# General Socket (for bell notifications)
GENERAL_SOCKET=http://localhost:4000

Bull Queue Configuration

const Queue = require('bull')(
'NotificationQueue',
`redis://${process.env.REDIS_HOST}:${process.env.REDIS_PORT}`,
);

const MAX_RETRIES = 10;
const BACKOFF_DELAY = 10000; // 10 seconds

🔄 Processing Logic

Job Types

The queue processor handles four notification types:

1. Email Processing

case 'email':
// 1. Create sender payload (business info)
let sender = await createSender({
accountId: job.data.sender_account,
internal: job.data.internal_sender
});

// 2. Generate SendGrid payload
let notificationData = generateSendgridPayload({
sender,
user: job.data.recipient,
template_id: job.data.content.template_id,
addtl_data: job.data.content.additional_data,
content: job.data.content.body,
subject: job.data.content.subject
});

// 3. Check DND list
const dnd = await DND.find({
account_id: job.data.sender_account,
value: { $in: [job.data.recipient.email] }
});
if (dnd?.length) {
// Skip notification - user opted out
await QueueModel.deleteOne({ _id: job.id });
return done();
}

// 4. Send email
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });

const email_response = await NotificationUtil.sendEmail(
notificationData,
accountId
);

// 5. Save communication log
await saveEmailCommunication(
com_data,
email_response.headers,
notificationData
);
});

done();

Key Steps:

  1. Fetch sender business information
  2. Generate SendGrid-compliant payload
  3. Check DND (Do Not Disturb) list
  4. Send via SendGrid API
  5. Save communication log
  6. Delete from queue

2. SMS Processing

case 'sms':
// 1. Fetch Twilio credentials
let twilio_account = await findTwilioAccount({
accountId: job.data.sender_account
});

// 2. Generate Twilio payload
let notificationData = generateTwilioPayload({
twilio_account,
data: job.data.data,
user: job.data.recipient,
account_id: job.data.sender_account,
user_id: job.data.sender_user,
use_credits: job.data.check_credits
});

// 3. Check DND list
const dnd = await DND.find({
account_id: job.data.sender_account,
value: { $in: [job.data.recipient.phone] }
});
if (dnd?.length) {
await QueueModel.deleteOne({ _id: job.id });
return done();
}

// 4. Send SMS
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });

const sms_response = await NotificationUtil.sendSMS(
notificationData,
true
);

// 5. Save communication log
await saveSMSCommunication(
com_data,
sms_response,
notificationData
);
});

done();

Key Steps:

  1. Fetch account's Twilio credentials
  2. Generate Twilio-compliant payload
  3. Check DND list
  4. Send via Twilio API
  5. Save communication log
  6. Delete from queue

3. FCM Processing

case 'fcm':
let notification = {
title: job.data.content.title,
body: job.data.content.body,
click_action: job.data.content.click_action,
data: job.data.content.data ?? {},
module: job.data.content.module,
type: job.data.content.type
};

await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: job.id }, { session });

await NotificationUtil.sendFCM(
[job.data.sender_account.toString()],
[job.data.recipient.user_id.toString()],
notification,
{}
);
});

done();

Key Steps:

  1. Build notification object
  2. Send to Firebase Cloud Messaging
  3. Delete from queue

4. Expo Push Processing

case 'expo-push':
// 1. Generate payload with user tokens
const notificationData = await generateExpoPayload(job.data);

const notif_ids = job.data?._id;

// 2. Check if any valid tokens exist
if (!notificationData.length && notif_ids) {
// No valid tokens - clean up
await QueueModel.deleteOne({ _id: notif_ids });
return done();
}

// 3. Send notifications
if (notificationData.length) {
await withTransaction(async (session) => {
await QueueModel.deleteOne({ _id: notif_ids }, { session });

await NotificationUtil.sendExpoPushNotification(
notificationData,
true
);
});
}

done();

Key Steps:

  1. Fetch user's Expo push tokens
  2. Generate Expo-compliant payload
  3. Validate tokens (remove invalid)
  4. Send via Expo Push API
  5. Delete from queue

Polling Logic

The queue processor polls MongoDB every 5 seconds:

const processQueuedList = async () => {
// 1. Fetch active notification types
let notificationModules = await Config.findOne({
type: 'notification-modules',
})
.lean()
.exec();

// 2. Query pending notifications
let pendingNotifications = await QueueModel.find({
type: { $in: notificationModules.active },
$or: [{ tries: { $lt: MAX_RETRIES } }, { tries: { $exists: 0 } }],
})
.lean()
.exec();

// 3. Separate Expo notifications (batch processing)
const expoNotifs = pendingNotifications.filter(notif => notif.type === 'expo-push');
pendingNotifications = pendingNotifications.filter(notif => notif.type !== 'expo-push');

// 4. Add to Bull queue
if (pendingNotifications.length) {
for (let notification of pendingNotifications) {
// Check for special review request configuration
let delay = 0;
let repeat;

if (notification.origin === 'reviews') {
const autoReviewRequest = await AutoReviewRequest.findOne({
account_id: notification.sender_account,
});

if (autoReviewRequest) {
delay = autoReviewRequest.delay * 1000;

if (!autoReviewRequest.send_only_once) {
repeat = {
every: autoReviewRequest.reminder * 24 * 60 * 60 * 1000,
limit: autoReviewRequest.retry || 1,
};
}
}
}

const queueOptions = {
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: BACKOFF_DELAY,
},
delay,
};

// Add immediate job
Queue.add({ ...notification }, queueOptions);

// Add repeatable job if configured
if (repeat) {
queueOptions.repeat = repeat;
Queue.add({ ...notification }, queueOptions);
}
}
}

// 5. Process Expo notifications
if (expoNotifs.length) {
for (let notification of expoNotifs) {
Queue.add(
{ ...notification },
{
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: {
type: 'exponential',
delay: BACKOFF_DELAY,
},
},
);
}
}
};

// Start cron job
let inProgress = false;
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await processQueuedList();
inProgress = false;
}
});

🚨 Error Handling

Retry Strategy

Exponential Backoff:

attempts: 10
backoff: {
type: 'exponential',
delay: 10000 // Base delay
}

// Resulting delays:
Attempt 1: 10s
Attempt 2: 20s
Attempt 3: 40s
Attempt 4: 80s (~1.3 min)
Attempt 5: 160s (~2.7 min)
Attempt 6: 320s (~5.3 min)
Attempt 7: 640s (~10.7 min)
Attempt 8: 1280s (~21.3 min)
Attempt 9: 2560s (~42.7 min)
Attempt 10: 5120s (~85.3 min)

Total time: ~2.7 hours

Failure Tracking

On each failed attempt:

Queue.on('failed', async (job, err) => {
let message = err.message || err;
if (err.isAxiosError) {
message = JSON.stringify(err.response.data);
}

// Update notification with error info
await QueueModel.updateOne(
{ _id: job.id },
{
$inc: { tries: 1 },
last_error: message,
},
);

// After max retries, create failed communication log
if (job.attemptsMade >= job.opts.attempts) {
if (['sms', 'email'].includes(job.data.type)) {
if (job.data.type === 'email') {
await saveEmailCommunication(
{ ...com_data, success: false },
{},
job.data.notificationData || {},
);
} else if (job.data.type === 'sms') {
await saveSMSCommunication(
{ ...com_data, success: false },
{},
job.data.notificationData || {},
);
}
}
}

// Log error
logger.error({
initiator: 'notification/queue',
error: err,
message: err.response?.data || err.message,
job: job.id,
job_data: job.data,
});
});

Success Handling

On successful delivery:

Queue.on('completed', async job => {
logger.log({
initiator: 'notification/queue',
message: 'Notification Processed',
job: job.id,
job_data: job.data,
});

// Update contact's last_contacted date
if (['email', 'sms'].includes(job.data.type) && job.data.comm_data) {
try {
await Contact.updateOne(
{ _id: job.data.comm_data.contact_id },
{ last_contacted: job.data.comm_data.created_at },
);
} catch (err) {
logger.error({
initiator: 'notification/queue',
message: 'Failed to update last contacted date',
job: job.id,
error: err,
});
}
}
});

🔍 Special Features

Review Request Repeating Notifications

The queue processor supports repeating notifications for review requests:

// Configuration from AutoReviewRequest
{
send_only_once: false, // Allow repeats
delay: 1, // Days to delay initial send
reminder: 7, // Days between reminders
retry: 3 // Number of reminders
}

// Results in:
// - Initial notification delayed by 1 day
// - Reminder 1: 7 days after initial
// - Reminder 2: 14 days after initial
// - Reminder 3: 21 days after initial

Implementation:

const queueOptions = {
jobId: notification._id.toString(),
attempts: MAX_RETRIES,
backoff: { type: 'exponential', delay: BACKOFF_DELAY },
delay: 1 * 24 * 60 * 60 * 1000, // 1 day
repeat: {
every: 7 * 24 * 60 * 60 * 1000, // 7 days
limit: 3, // 3 reminders
},
};

Queue.add({ ...notification }, queueOptions);

DND List Handling

Before sending email or SMS, the processor checks the DND list:

const dnd = await DND.find({
account_id: new mongoose.Types.ObjectId(job.data.sender_account),
value: { $in: [job.data.recipient.email] }, // or phone
});

if (dnd?.length) {
logger.error({
initiator: 'notification/queue',
message: 'DND enabled for received email(s).',
job: job.id,
job_data: job.data,
});

// For repeatable jobs, just skip this attempt
if (job.id.startsWith('repeat')) {
return done();
}

// Otherwise, delete from queue
await QueueModel.deleteOne({ _id: job.id });
return done();
}

📊 Performance Considerations

Polling Frequency

Current: Every 5 seconds

Considerations:

  • Lower frequency (10s): Reduces DB load but increases notification delay
  • Higher frequency (3s): Faster delivery but increases DB load
  • Optimal: 5s balances latency and resource usage

Queue Prioritization

Currently, all notifications have equal priority. Consider implementing:

// High priority (immediate)
const criticalNotifications = ['instareports_failed', 'instasites_failed', 'support_notifications'];

// Medium priority (5s delay)
const standardNotifications = ['contacts_assigned', 'deals_assigned', 'forms_submitted'];

// Low priority (30s delay)
const marketingNotifications = ['review_requests', 'funnel_notifications'];

Batch Processing

Expo notifications are already batched. Consider batching other types:

// Batch emails by account for SendGrid API efficiency
const emailsByAccount = groupBy(emails, 'sender_account');

for (const [accountId, emails] of Object.entries(emailsByAccount)) {
// Send all emails for account in single API call
await sendBatchEmails(accountId, emails);
}

🐛 Troubleshooting

Issue: Queue Not Processing

Symptoms: Notifications stuck in MongoDB, not delivered

Check:

  1. Environment flag:

    echo $QUEUE_ENABLED
    # Should output: true
  2. Service running:

    grep "QUEUE SERVICE RUNNING" notifications.log
  3. Redis connection:

    redis-cli -h $REDIS_HOST -p $REDIS_PORT ping
    # Should return: PONG
  4. Bull queue health:

    // In Node.js console
    const Queue = require('bull')('NotificationQueue', 'redis://...');
    await Queue.getJobCounts();
    // Should show: { waiting, active, completed, failed }

Issue: High Failure Rate

Symptoms: Many jobs reaching MAX_RETRIES

Common Causes:

  1. SendGrid API errors:

    // Check SendGrid key validity
    const key = await SendgridKey.findOne({ account_id });
    // Test key with API call
  2. Twilio API errors:

    // Verify credentials
    const account = await Account.findById(id, { twilio_account: 1 });
  3. Invalid recipients:

    // Check email format
    // Check phone format (E.164)

Issue: Slow Processing

Symptoms: Notifications delayed beyond poll interval

Diagnosis:

// Check queue size
db.getCollection('notifications.queue').count();

// Check Bull queue backlog
await Queue.getJobCounts();

// Check for long-running jobs
const active = await Queue.getActive();
// Look for jobs stuck in active state

Solutions:

  • Scale horizontally (add more notification service instances)
  • Increase Redis memory
  • Optimize external API calls
  • Implement queue prioritization

📈 Metrics to Monitor

Queue Health:

  • Pending notification count (should trend toward 0)
  • Average time in queue (should be < 10 seconds)
  • Retry rate (should be < 5%)

Delivery Rates:

  • Success rate per notification type (target > 95%)
  • DND blocking rate
  • External API error rate

Performance:

  • Poll duration (should be < 1 second)
  • Bull queue processing time
  • External API response time

Module Type: Cron Job (every 5 seconds) Environment Flag: QUEUE_ENABLED=true (REQUIRED) Dependencies: MongoDB, Redis, SendGrid, Twilio, Firebase, Expo Status: Core - required for all notification delivery

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM