๐จ Error Reporting (Communication Violations)
๐ Overviewโ
The Error Reporting job monitors communication compliance by analyzing email and SMS delivery metrics over a 7-day rolling window. It runs every 5 seconds, identifies accounts with recent communication activity, calculates error rates and opt-out rates, and automatically suspends accounts that exceed violation thresholds. The system prevents duplicate notifications by tracking violation timestamps and enforces a 7-day cooldown period before re-evaluating suspended accounts.
Complete Flow:
- Cron Initialization:
queue-manager/crons/communication/index.js - Service Processing:
queue-manager/services/communication/errorReporting.js - Queue Definition:
queue-manager/queues/communication/errorCalculations.js
Execution Pattern: Ultra-high frequency polling (every 5 seconds) with account-level queuing
Queue Name: comm_error_calculation
Environment Flag: QM_COMMUNICATION_ERROR_REPORTING=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5 sec)
participant SERVICE as Error Reporting<br/>Service
participant DB as Communications DB
participant QUEUE as Error Calculation<br/>Queue
participant ACCOUNT_DB as Accounts DB
participant NOTIF as Notification<br/>Service
CRON->>SERVICE: errorReporting()
SERVICE->>DB: Aggregate:<br/>Find unique account_ids<br/>with communications<br/>in last 7 days
DB-->>SERVICE: List of account IDs
loop Each account_id
SERVICE->>QUEUE: Add job: {account_id}
end
loop Each queued account
QUEUE->>DB: Aggregate communications:<br/>- Count failed emails<br/>- Count unsubscribe emails<br/>- Count total emails<br/>- Count failed SMS<br/>- Count unsubscribe SMS<br/>- Count total SMS
DB-->>QUEUE: Communication stats
QUEUE->>QUEUE: Calculate rates:<br/>Email error rate = failed/total<br/>Email opt rate = unsubscribe/total<br/>SMS error rate = failed/total<br/>SMS opt rate = unsubscribe/total
alt Email error rate > 8%
QUEUE->>QUEUE: Flag email.error = true
alt Email error rate > 12.5%
QUEUE->>QUEUE: Flag email.suspend = true
end
end
alt Email opt-out rate > 1%
QUEUE->>QUEUE: Flag email.optout = true
alt Email opt-out rate > 2.5%
QUEUE->>QUEUE: Flag email.suspend = true
end
end
alt SMS error rate > 8%
QUEUE->>QUEUE: Flag sms.error = true
alt SMS error rate > 12.5%
QUEUE->>QUEUE: Flag sms.suspend = true
end
end
alt SMS opt-out rate > 1%
QUEUE->>QUEUE: Flag sms.optout = true
alt SMS opt-out rate > 2.5%
QUEUE->>QUEUE: Flag sms.suspend = true
end
end
QUEUE->>ACCOUNT_DB: Fetch account suspend history
alt Violation within 7-day cooldown
QUEUE->>QUEUE: Clear violation flags<br/>(prevent duplicate notifications)
else Cooldown expired or no history
QUEUE->>ACCOUNT_DB: Update suspend timestamps:<br/>- last_email_violation_at<br/>- last_sms_violation_at<br/>- email_suspended_at<br/>- sms_suspended_at<br/>- suspend.email = true/false<br/>- suspend.sms = true/false
QUEUE->>NOTIF: Send notification:<br/>type: communication.violation<br/>data: violation details
end
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/communication/index.js
Purpose: Schedule error reporting to run every 5 seconds
Cron Pattern: */5 * * * * * (every 5 seconds)
Initialization:
const { errorReporting } = require('../../services/communication/errorReporting');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await errorReporting();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/communication/error-reporting', error: err });
}
};
In-Progress Lock: Prevents overlapping executions during heavy processing.
Note: Ultra-high frequency (720 times per hour) ensures near real-time violation detection.
2. Service Processingโ
File: queue-manager/services/communication/errorReporting.js
Purpose: Find accounts with recent communications and queue for analysis
Key Features:
- 7-day lookback window
- Aggregation to find unique accounts
- Batch job queuing with retry logic
- Promise.all for parallel queuing
Main Service Function:
const errorCalculations = require('../../queues/communication/errorCalculations');
const Communication = require('../../models/communication');
const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days
exports.errorReporting = async () => {
try {
let accounts = await Communication.aggregate([
{
$match: {
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$group: {
_id: null,
accounts: {
$addToSet: '$account_id',
},
},
},
]);
accounts = accounts[0]?.accounts;
if (accounts.length) {
let queue = await errorCalculations.start();
await Promise.all(
accounts.map(async id => {
try {
await queue.add(
{
account_id: id,
},
{
attempts: 4,
backoff: 4000,
},
);
} catch (err) {
console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
}),
);
console.log('Accounts added to queue for import.');
}
} catch (err) {
console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
};
Aggregation Query:
Stage 1 - Match: Communications in last 7 days
{
$match: {
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
}
Stage 2 - Group: Extract unique account IDs
{
$group: {
_id: null,
accounts: {
$addToSet: '$account_id',
},
},
}
3. Queue Processing (THE VIOLATION DETECTION LOGIC)โ
File: queue-manager/queues/communication/errorCalculations.js
Purpose: Calculate error rates, opt-out rates, and enforce suspension thresholds
Key Functions:
- Complex faceted aggregation for metrics
- Rate calculation with thresholds
- Cooldown period enforcement
- Account suspension updates
- Notification service integration
Complete Processor (Simplified - full code ~370 lines):
const mongoose = require('mongoose');
const QueueWrapper = require('../../common/queue-wrapper');
const Communication = require('../../models/communication');
const Account = require('../../models/account');
const { default: axios } = require('axios');
const logger = require('../../utilities/logger');
const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days
const THRESHOLD_EMAIL = 50; // Minimum emails to trigger analysis
const THRESHOLD_SMS = 25; // Minimum SMS to trigger analysis
const processCb = async (job, done) => {
try {
const { account_id } = job.data;
const account = await Account.findById(account_id).lean();
// Complex faceted aggregation
let reporting = await Communication.aggregate([
{
$match: {
account_id: new mongoose.Types.ObjectId(account_id),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$facet: {
failed_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['dropped', 'deferred', 'bounce'],
},
},
},
{
$count: 'total',
},
],
unsubscribe_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['unsubscribe', 'spamreport'],
},
},
},
{
$count: 'total',
},
],
all_email: [
{
$match: {
message_type: 'EMAIL',
},
},
{
$count: 'total',
},
],
failed_sms: [
{
$match: {
message_type: 'SMS',
'events.MessageStatus': {
$in: ['undelivered', 'failed', 'canceled'],
},
},
},
{
$count: 'total',
},
],
unsubscribe_sms: [
{
$match: {
message_type: 'SMS',
body: {
$in: ['STOP', 'STOPALL', 'UNSUBSCRIBE', 'CANCEL', 'END', 'QUIT'],
},
},
},
{
$count: 'total',
},
],
all_sms: [
{
$match: {
message_type: 'SMS',
},
},
{
$count: 'total',
},
],
},
},
{
$set: {
failed_email: {
$ifNull: [
{
$arrayElemAt: ['$failed_email.total', 0],
},
0,
],
},
unsubscribe_email: {
$ifNull: [
{
$arrayElemAt: ['$unsubscribe_email.total', 0],
},
0,
],
},
all_email: {
$ifNull: [
{
$arrayElemAt: ['$all_email.total', 0],
},
0,
],
},
failed_sms: {
$ifNull: [
{
$arrayElemAt: ['$failed_sms.total', 0],
},
0,
],
},
unsubscribe_sms: {
$ifNull: [
{
$arrayElemAt: ['$unsubscribe_sms.total', 0],
},
0,
],
},
all_sms: {
$ifNull: [
{
$arrayElemAt: ['$all_sms.total', 0],
},
0,
],
},
},
},
]);
reporting = reporting[0];
// Initialize response object
let resp = {
email: {
error: false,
optout: false,
suspend: false,
},
sms: {
error: false,
optout: false,
suspend: false,
},
};
let updObj = {};
// Evaluate email metrics
if (reporting?.all_email > THRESHOLD_EMAIL) {
const error_rate = reporting?.failed_email / reporting?.all_email;
const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;
if (error_rate > 0.08) {
resp.email.error = true;
if (error_rate > 0.125) {
resp.email.suspend = true;
}
}
if (opt_rate > 0.01) {
resp.email.optout = true;
if (opt_rate > 0.025) {
resp.email.suspend = true;
}
}
}
// Evaluate SMS metrics
if (reporting?.all_sms > THRESHOLD_SMS) {
const error_rate = reporting?.failed_sms / reporting?.all_sms;
const opt_rate = reporting?.unsubscribe_sms / reporting?.all_sms;
if (error_rate > 0.08) {
resp.sms.error = true;
if (error_rate > 0.125) {
resp.sms.suspend = true;
}
}
if (opt_rate > 0.01) {
resp.sms.optout = true;
if (opt_rate > 0.025) {
resp.sms.suspend = true;
}
}
}
// Enforce cooldown periods
if (resp.email.error || resp.email.optout) {
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
is_over = new Date(account.suspend.last_email_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.email.error = false;
resp.email.optout = false;
}
}
if (is_over) {
updObj['suspend.last_email_violation_at'] = Date.now();
}
}
if (resp.sms.error || resp.sms.optout) {
let is_over = true;
if (account?.suspend?.last_sms_violation_at) {
is_over = new Date(account.suspend.last_sms_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.sms.error = false;
resp.sms.optout = false;
}
}
if (is_over) {
updObj['suspend.last_sms_violation_at'] = Date.now();
}
}
if (resp.email.suspend) {
let is_over = true;
if (account?.suspend?.email_suspended_at) {
is_over = new Date(account.suspend.email_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.email.suspend = false;
}
}
if (is_over) {
updObj['suspend.email'] = resp.email.suspend;
updObj['suspend.email_suspended_at'] = Date.now();
}
}
if (resp.sms.suspend) {
let is_over = true;
if (account?.suspend?.sms_suspended_at) {
is_over = new Date(account.suspend.sms_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.sms.suspend = false;
}
}
if (is_over) {
updObj['suspend.sms'] = resp.sms.suspend;
updObj['suspend.sms_suspended_at'] = Date.now();
}
}
// Update account if violations detected
if (Object.keys(updObj).length) {
await Account.findByIdAndUpdate(account_id, updObj);
}
// Send notification if any violations
if (
resp.email.error ||
resp.email.optout ||
resp.sms.error ||
resp.sms.optout ||
resp.email.suspend ||
resp.sms.suspend
) {
await axios.get(`${process.env.NOTIFICATION_SERVICE_URL}/run-service`, {
params: {
type: 'communication.violation',
'data[account_id]': account_id.toString(),
'data[email_error]': resp.email.error,
'data[email_optout]': resp.email.optout,
'data[sms_error]': resp.sms.error,
'data[sms_optout]': resp.sms.optout,
'data[sms_suspend]': resp.sms.suspend,
'data[email_suspend]': resp.email.suspend,
},
});
}
return done();
} catch (err) {
done(err);
}
};
const completedCb = async job => {};
let queue;
exports.start = async () => {
try {
if (!queue)
queue = QueueWrapper(`comm_error_calculation`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/communication/error-calculation',
error: err,
message: `Error while starting queue`,
});
}
};
๐๏ธ Collections Usedโ
communicationsโ
- Operations: Aggregate (faceted metrics), Count
- Model:
shared/models/communication.js - Usage Context: Track all email and SMS communications with delivery events
Key Fields:
_id: Communication record IDaccount_id: Account owner referencemessage_type: 'EMAIL' or 'SMS'module: Source module (e.g., 'automation', 'campaigns')sender_id: User who sent the messagecontact_id: Array of contact referencesconversation_id: Array of conversation referencesevents: Array of delivery events (strict: false allows dynamic fields)- Email events: 'dropped', 'deferred', 'bounce', 'unsubscribe', 'spamreport'
- SMS events: 'MessageStatus' with values 'undelivered', 'failed', 'canceled'
body: Message body (used for SMS opt-out detection)createdAt: Message creation timestamp (indexed for 7-day lookback)
_accountsโ
- Operations: Find, Update
- Model:
shared/models/account.js - Usage Context: Store suspension flags and violation timestamps
Key Fields (suspend object):
suspend.email: Boolean - email sending suspendedsuspend.sms: Boolean - SMS sending suspendedsuspend.last_email_violation_at: Timestamp - last email violation detectedsuspend.last_sms_violation_at: Timestamp - last SMS violation detectedsuspend.email_suspended_at: Timestamp - when email was suspendedsuspend.sms_suspended_at: Timestamp - when SMS was suspended
๐ง Job Configurationโ
Cron Scheduleโ
'*/5 * * * * *'; // Every 5 seconds
Frequency: 720 times per hour
Rationale: Ultra-high frequency ensures near real-time violation detection and account suspension.
Timeframe Configurationโ
const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days in milliseconds
7-Day Rolling Window: All calculations based on last 7 days of activity.
Volume Thresholdsโ
const THRESHOLD_EMAIL = 50; // Minimum emails to trigger analysis
const THRESHOLD_SMS = 25; // Minimum SMS to trigger analysis
Rationale: Prevents false positives for low-volume senders.
Violation Thresholdsโ
Email Error Rate:
- Warning: > 8% (0.08) - Flag
email.error = true - Suspension: > 12.5% (0.125) - Flag
email.suspend = true
Email Opt-Out Rate:
- Warning: > 1% (0.01) - Flag
email.optout = true - Suspension: > 2.5% (0.025) - Flag
email.suspend = true
SMS Error Rate:
- Warning: > 8% (0.08) - Flag
sms.error = true - Suspension: > 12.5% (0.125) - Flag
sms.suspend = true
SMS Opt-Out Rate:
- Warning: > 1% (0.01) - Flag
sms.optout = true - Suspension: > 2.5% (0.025) - Flag
sms.suspend = true
Queue Settingsโ
QueueWrapper(`comm_error_calculation`, 'global', {
processCb,
completedCb,
});
Queue Name: comm_error_calculation
Concurrency: Default (1)
Retry Settings:
{
attempts: 4,
backoff: 4000, // 4-second delay between retries
}
๐ Processing Logic - Detailed Flowโ
1. Faceted Aggregationโ
Purpose: Calculate 6 metrics in single query
Aggregation Pipeline:
Stage 1 - Match: Account communications in last 7 days
{
$match: {
account_id: new mongoose.Types.ObjectId(account_id),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
}
Stage 2 - Facet: Calculate all metrics in parallel
{
$facet: {
failed_email: [...], // Count dropped/deferred/bounce emails
unsubscribe_email: [...], // Count unsubscribe/spam report emails
all_email: [...], // Count total emails
failed_sms: [...], // Count undelivered/failed/canceled SMS
unsubscribe_sms: [...], // Count STOP/UNSUBSCRIBE SMS
all_sms: [...], // Count total SMS
},
}
Stage 3 - Set: Extract counts from arrays (default to 0)
{
$set: {
failed_email: {
$ifNull: [
{ $arrayElemAt: ['$failed_email.total', 0] },
0,
],
},
// ... same for other metrics
},
}
2. Rate Calculationโ
Email Error Rate:
const error_rate = reporting?.failed_email / reporting?.all_email;
// Example: 10 failed / 100 total = 0.10 (10%)
Email Opt-Out Rate:
const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;
// Example: 2 unsubscribes / 100 total = 0.02 (2%)
SMS Error Rate:
const error_rate = reporting?.failed_sms / reporting?.all_sms;
// Example: 5 failed / 50 total = 0.10 (10%)
SMS Opt-Out Rate:
const opt_rate = reporting?.unsubscribe_sms / reporting?.all_sms;
// Example: 1 STOP / 50 total = 0.02 (2%)
3. Threshold Evaluationโ
Email Evaluation Logic:
if (reporting?.all_email > THRESHOLD_EMAIL) {
// Must have >50 emails
const error_rate = reporting?.failed_email / reporting?.all_email;
const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;
if (error_rate > 0.08) {
// >8% error rate
resp.email.error = true;
if (error_rate > 0.125) {
// >12.5% error rate
resp.email.suspend = true;
}
}
if (opt_rate > 0.01) {
// >1% opt-out rate
resp.email.optout = true;
if (opt_rate > 0.025) {
// >2.5% opt-out rate
resp.email.suspend = true;
}
}
}
SMS Evaluation Logic: Identical structure with SMS metrics.
4. Cooldown Period Enforcementโ
Purpose: Prevent duplicate notifications within 7 days
Email Violation Cooldown:
if (resp.email.error || resp.email.optout) {
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
// Check if 7 days have passed since last violation
is_over = new Date(account.suspend.last_email_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
// Within cooldown - clear flags (no notification)
resp.email.error = false;
resp.email.optout = false;
}
}
if (is_over) {
// Update violation timestamp
updObj['suspend.last_email_violation_at'] = Date.now();
}
}
SMS Violation Cooldown: Identical structure with SMS fields.
Suspension Cooldown:
if (resp.email.suspend) {
let is_over = true;
if (account?.suspend?.email_suspended_at) {
// Check if 7 days have passed since suspension
is_over = new Date(account.suspend.email_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
// Still suspended - clear flag
resp.email.suspend = false;
}
}
if (is_over) {
// Apply new suspension
updObj['suspend.email'] = resp.email.suspend;
updObj['suspend.email_suspended_at'] = Date.now();
}
}
5. Account Updateโ
Update Pattern:
if (Object.keys(updObj).length) {
await Account.findByIdAndUpdate(account_id, updObj);
}
Example Update Object:
{
'suspend.last_email_violation_at': 1697212800000,
'suspend.email': true,
'suspend.email_suspended_at': 1697212800000,
}
6. Notification Triggerโ
Condition: Any violation flag is true
if (
resp.email.error ||
resp.email.optout ||
resp.sms.error ||
resp.sms.optout ||
resp.email.suspend ||
resp.sms.suspend
) {
// Send notification
}
Notification Service Call:
await axios.get(`${process.env.NOTIFICATION_SERVICE_URL}/run-service`, {
params: {
type: 'communication.violation',
'data[account_id]': account_id.toString(),
'data[email_error]': resp.email.error,
'data[email_optout]': resp.email.optout,
'data[sms_error]': resp.sms.error,
'data[sms_optout]': resp.sms.optout,
'data[sms_suspend]': resp.sms.suspend,
'data[email_suspend]': resp.email.suspend,
},
});
Notification Type: communication.violation
Notification Data: All violation flags as boolean parameters
๐จ Error Handlingโ
Common Error Scenariosโ
Aggregation Failureโ
Scenario: MongoDB aggregation query fails (timeout, connection issue)
Handling: Error passed to done(err), job retries (4 attempts with 4-second backoff)
Impact: Account metrics not evaluated, retry on next attempt
Account Not Foundโ
Scenario: Account deleted between service and queue processing
Handling: account variable is null, cooldown checks gracefully handle with optional chaining
Impact: No suspension history, violations treated as first-time
Notification Service Unavailableโ
Scenario: Notification service down or unreachable
Handling: Axios call fails, error thrown, job retries
Impact: Violation detected but notification not sent, retry on next attempt
Division by Zeroโ
Scenario: Total emails/SMS is zero (should be prevented by thresholds)
Handling: Not explicitly handled, would result in Infinity or NaN
Impact: Comparison with thresholds would fail, no violations detected
Failed Job Callbackโ
Note: No explicit failedCb defined - relies on default Bull error handling.
Completed Job Callbackโ
const completedCb = async job => {};
Action: No-op (empty function)
๐ Monitoring & Loggingโ
Success Loggingโ
Service Level:
- Accounts added to queue for import (console.log)
Queue Level:
- No explicit success logging
Error Loggingโ
Cron Level:
- Error in cron initialization
Service Level:
- Error aggregating accounts
- Error queuing individual accounts (console.log)
Queue Level:
- Error starting queue
Performance Metricsโ
- Typical Aggregation Time: 1-5 seconds per account (complex faceted query)
- Accounts Processed Per Run: Varies (all accounts with 7-day activity)
- High-Volume System: Could process 100-1000+ accounts per run
- Total Job Time: 2-10 seconds per account
๐ Integration Pointsโ
Triggers This Jobโ
- Cron Schedule: Every 5 seconds (no external triggers)
- Database Activity: Any communication creates potential for violation
External Dependenciesโ
- Notification Service: Sends violation alerts to admins
- SendGrid: Email delivery events stored in communications
- Twilio: SMS delivery events stored in communications
Jobs That Depend On Thisโ
- None: Standalone compliance monitoring
Related Featuresโ
- Email Campaigns: Suspended accounts cannot send campaigns
- SMS Campaigns: Suspended accounts cannot send SMS
- Automation: Suspended accounts have automations paused
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Account Suspension: Automatically blocks email/SMS sending
- โ ๏ธ Service Disruption: Customer communication stops immediately
- โ ๏ธ Notification Spam: High frequency could trigger many notifications
Performance Considerationsโ
- High Frequency: Every 5 seconds may cause DB load
- Faceted Aggregation: Complex query on large communications collection
- No Caching: Recalculates metrics on every run (no caching of rates)
- Cooldown Period: 7-day cooldown prevents notification spam
Business Logicโ
Why 7-Day Window?
- Industry standard for compliance monitoring
- Balances recent behavior with statistical significance
- Aligns with SendGrid and Twilio reporting periods
Why Different Thresholds?
- Email errors more common (spam filters, invalid addresses)
- SMS errors less common (validated phone numbers)
- Opt-out rates critical for compliance (CAN-SPAM, TCPA)
Why Every 5 Seconds?
- Near real-time violation detection
- Prevents continued abuse after threshold crossed
- Minimizes customer complaints to providers
Why Cooldown Period?
- Prevents notification spam
- Allows time for account review and remediation
- 7-day window matches evaluation timeframe
Maintenance Notesโ
- Thresholds: Hardcoded in queue file (consider environment variables)
- Timeframe: 7 days hardcoded (consider configurable)
- Volume Thresholds: 50 emails, 25 SMS hardcoded
- Notification Type: 'communication.violation' hardcoded
- Cron Frequency: Every 5 seconds (consider reducing for performance)
Code Quality Issuesโ
Issue 1: Inconsistent Error Messages
console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
Issue: Copy-pasted error message references wrong file path (accounts/import.js instead of communication/errorReporting.js).
Suggestion: Fix error messages:
logger.error({
initiator: 'QM/communication/error-reporting',
message: 'Error occurred while aggregating accounts',
error: err,
});
Issue 2: Variable Shadowing
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
const is_over = ... // Shadows outer variable
if (!is_over) {
resp.email.error = false;
}
}
if (is_over) { // Uses outer variable (unchanged)
updObj['suspend.last_email_violation_at'] = Date.now();
}
Issue: const is_over inside if-block shadows outer let is_over, causing logic error.
Suggestion: Remove inner const:
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
is_over = ... // Update outer variable
if (!is_over) {
resp.email.error = false;
}
}
Issue 3: Duplicate Suspension Logic
if (opt_rate > 0.01) {
resp.email.optout = true;
if (error_rate > 0.025) {
// BUG: Should be opt_rate
resp.email.suspend = true;
}
}
Issue: Checking error_rate instead of opt_rate for opt-out suspension threshold.
Suggestion: Fix condition:
if (opt_rate > 0.01) {
resp.email.optout = true;
if (opt_rate > 0.025) {
// Corrected
resp.email.suspend = true;
}
}
Issue 4: Division by Zero Not Handled
const error_rate = reporting?.failed_email / reporting?.all_email;
Issue: If all_email is 0, results in Infinity.
Suggestion: Add defensive check (already exists with THRESHOLD_EMAIL).
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/communication/errorReporting
Simulate Email Violationsโ
const Communication = require('./models/communication');
// Create 100 emails with 15 failures (15% error rate - above suspension threshold)
const communications = [];
for (let i = 0; i < 100; i++) {
const isFailed = i < 15;
communications.push({
account_id: accountId,
message_type: 'EMAIL',
module: 'test',
events: [
{
event: isFailed ? 'bounce' : 'delivered',
timestamp: Date.now(),
},
],
createdAt: new Date(),
});
}
await Communication.insertMany(communications);
console.log('Created 100 emails with 15% error rate (should trigger suspension)');
Verify Violation Detectionโ
// Wait for next cron run (up to 5 seconds)
await new Promise(resolve => setTimeout(resolve, 6000));
// Check account suspension status
const account = await Account.findById(accountId);
console.log('Email suspended:', account.suspend.email); // Should be true
console.log('Suspension timestamp:', account.suspend.email_suspended_at);
console.log('Last violation:', account.suspend.last_email_violation_at);
Test Cooldown Periodโ
// Set violation timestamp 8 days ago
await Account.findByIdAndUpdate(accountId, {
'suspend.last_email_violation_at': Date.now() - 8 * 24 * 60 * 60 * 1000,
});
// Create new violations
// ... (same as above)
// Wait for cron
await new Promise(resolve => setTimeout(resolve, 6000));
// Verify new notification sent
const account = await Account.findById(accountId);
const hoursSince = (Date.now() - account.suspend.last_email_violation_at) / (1000 * 60 * 60);
console.log('Hours since last violation:', hoursSince); // Should be ~0 (just updated)
Monitor Queue Processingโ
# Watch logs during violation detection
tail -f logs/queue-manager.log | grep "communication"
# Expected outputs:
# "Accounts added to queue for import."
# (Notification service logs if violations detected)
Test Faceted Aggregationโ
const mongoose = require('mongoose');
const Communication = require('./models/communication');
const TIMEFRAME = 7 * 24 * 60 * 60 * 1000;
const reporting = await Communication.aggregate([
{
$match: {
account_id: new mongoose.Types.ObjectId(accountId),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$facet: {
failed_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['dropped', 'deferred', 'bounce'],
},
},
},
{
$count: 'total',
},
],
all_email: [
{
$match: {
message_type: 'EMAIL',
},
},
{
$count: 'total',
},
],
},
},
]);
console.log('Email metrics:', reporting[0]);
console.log('Error rate:', reporting[0].failed_email[0]?.total / reporting[0].all_email[0]?.total);
Job Type: High-Frequency Scheduled with Account-Level Queuing
Execution Frequency: Every 5 seconds
Average Duration: 2-10 seconds per account
Status: Active