Skip to main content

๐Ÿšจ Error Reporting (Communication Violations)

๐Ÿ“– Overviewโ€‹

The Error Reporting job monitors communication compliance by analyzing email and SMS delivery metrics over a 7-day rolling window. It runs every 5 seconds, identifies accounts with recent communication activity, calculates error rates and opt-out rates, and automatically suspends accounts that exceed violation thresholds. The system prevents duplicate notifications by tracking violation timestamps and enforces a 7-day cooldown period before re-evaluating suspended accounts.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/communication/index.js
  2. Service Processing: queue-manager/services/communication/errorReporting.js
  3. Queue Definition: queue-manager/queues/communication/errorCalculations.js

Execution Pattern: Ultra-high frequency polling (every 5 seconds) with account-level queuing

Queue Name: comm_error_calculation

Environment Flag: QM_COMMUNICATION_ERROR_REPORTING=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5 sec)
participant SERVICE as Error Reporting<br/>Service
participant DB as Communications DB
participant QUEUE as Error Calculation<br/>Queue
participant ACCOUNT_DB as Accounts DB
participant NOTIF as Notification<br/>Service

CRON->>SERVICE: errorReporting()
SERVICE->>DB: Aggregate:<br/>Find unique account_ids<br/>with communications<br/>in last 7 days
DB-->>SERVICE: List of account IDs

loop Each account_id
SERVICE->>QUEUE: Add job: {account_id}
end

loop Each queued account
QUEUE->>DB: Aggregate communications:<br/>- Count failed emails<br/>- Count unsubscribe emails<br/>- Count total emails<br/>- Count failed SMS<br/>- Count unsubscribe SMS<br/>- Count total SMS
DB-->>QUEUE: Communication stats

QUEUE->>QUEUE: Calculate rates:<br/>Email error rate = failed/total<br/>Email opt rate = unsubscribe/total<br/>SMS error rate = failed/total<br/>SMS opt rate = unsubscribe/total

alt Email error rate > 8%
QUEUE->>QUEUE: Flag email.error = true
alt Email error rate > 12.5%
QUEUE->>QUEUE: Flag email.suspend = true
end
end

alt Email opt-out rate > 1%
QUEUE->>QUEUE: Flag email.optout = true
alt Email opt-out rate > 2.5%
QUEUE->>QUEUE: Flag email.suspend = true
end
end

alt SMS error rate > 8%
QUEUE->>QUEUE: Flag sms.error = true
alt SMS error rate > 12.5%
QUEUE->>QUEUE: Flag sms.suspend = true
end
end

alt SMS opt-out rate > 1%
QUEUE->>QUEUE: Flag sms.optout = true
alt SMS opt-out rate > 2.5%
QUEUE->>QUEUE: Flag sms.suspend = true
end
end

QUEUE->>ACCOUNT_DB: Fetch account suspend history

alt Violation within 7-day cooldown
QUEUE->>QUEUE: Clear violation flags<br/>(prevent duplicate notifications)
else Cooldown expired or no history
QUEUE->>ACCOUNT_DB: Update suspend timestamps:<br/>- last_email_violation_at<br/>- last_sms_violation_at<br/>- email_suspended_at<br/>- sms_suspended_at<br/>- suspend.email = true/false<br/>- suspend.sms = true/false

QUEUE->>NOTIF: Send notification:<br/>type: communication.violation<br/>data: violation details
end
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/communication/index.js

Purpose: Schedule error reporting to run every 5 seconds

Cron Pattern: */5 * * * * * (every 5 seconds)

Initialization:

const { errorReporting } = require('../../services/communication/errorReporting');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * * *', async () => {
if (!inProgress) {
inProgress = true;
await errorReporting();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/communication/error-reporting', error: err });
}
};

In-Progress Lock: Prevents overlapping executions during heavy processing.

Note: Ultra-high frequency (720 times per hour) ensures near real-time violation detection.

2. Service Processingโ€‹

File: queue-manager/services/communication/errorReporting.js

Purpose: Find accounts with recent communications and queue for analysis

Key Features:

  • 7-day lookback window
  • Aggregation to find unique accounts
  • Batch job queuing with retry logic
  • Promise.all for parallel queuing

Main Service Function:

const errorCalculations = require('../../queues/communication/errorCalculations');
const Communication = require('../../models/communication');

const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days

exports.errorReporting = async () => {
try {
let accounts = await Communication.aggregate([
{
$match: {
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$group: {
_id: null,
accounts: {
$addToSet: '$account_id',
},
},
},
]);
accounts = accounts[0]?.accounts;

if (accounts.length) {
let queue = await errorCalculations.start();
await Promise.all(
accounts.map(async id => {
try {
await queue.add(
{
account_id: id,
},
{
attempts: 4,
backoff: 4000,
},
);
} catch (err) {
console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
}),
);
console.log('Accounts added to queue for import.');
}
} catch (err) {
console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);
}
};

Aggregation Query:

Stage 1 - Match: Communications in last 7 days

{
$match: {
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
}

Stage 2 - Group: Extract unique account IDs

{
$group: {
_id: null,
accounts: {
$addToSet: '$account_id',
},
},
}

3. Queue Processing (THE VIOLATION DETECTION LOGIC)โ€‹

File: queue-manager/queues/communication/errorCalculations.js

Purpose: Calculate error rates, opt-out rates, and enforce suspension thresholds

Key Functions:

  • Complex faceted aggregation for metrics
  • Rate calculation with thresholds
  • Cooldown period enforcement
  • Account suspension updates
  • Notification service integration

Complete Processor (Simplified - full code ~370 lines):

const mongoose = require('mongoose');
const QueueWrapper = require('../../common/queue-wrapper');
const Communication = require('../../models/communication');
const Account = require('../../models/account');
const { default: axios } = require('axios');
const logger = require('../../utilities/logger');

const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days
const THRESHOLD_EMAIL = 50; // Minimum emails to trigger analysis
const THRESHOLD_SMS = 25; // Minimum SMS to trigger analysis

const processCb = async (job, done) => {
try {
const { account_id } = job.data;

const account = await Account.findById(account_id).lean();

// Complex faceted aggregation
let reporting = await Communication.aggregate([
{
$match: {
account_id: new mongoose.Types.ObjectId(account_id),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$facet: {
failed_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['dropped', 'deferred', 'bounce'],
},
},
},
{
$count: 'total',
},
],
unsubscribe_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['unsubscribe', 'spamreport'],
},
},
},
{
$count: 'total',
},
],
all_email: [
{
$match: {
message_type: 'EMAIL',
},
},
{
$count: 'total',
},
],
failed_sms: [
{
$match: {
message_type: 'SMS',
'events.MessageStatus': {
$in: ['undelivered', 'failed', 'canceled'],
},
},
},
{
$count: 'total',
},
],
unsubscribe_sms: [
{
$match: {
message_type: 'SMS',
body: {
$in: ['STOP', 'STOPALL', 'UNSUBSCRIBE', 'CANCEL', 'END', 'QUIT'],
},
},
},
{
$count: 'total',
},
],
all_sms: [
{
$match: {
message_type: 'SMS',
},
},
{
$count: 'total',
},
],
},
},
{
$set: {
failed_email: {
$ifNull: [
{
$arrayElemAt: ['$failed_email.total', 0],
},
0,
],
},
unsubscribe_email: {
$ifNull: [
{
$arrayElemAt: ['$unsubscribe_email.total', 0],
},
0,
],
},
all_email: {
$ifNull: [
{
$arrayElemAt: ['$all_email.total', 0],
},
0,
],
},
failed_sms: {
$ifNull: [
{
$arrayElemAt: ['$failed_sms.total', 0],
},
0,
],
},
unsubscribe_sms: {
$ifNull: [
{
$arrayElemAt: ['$unsubscribe_sms.total', 0],
},
0,
],
},
all_sms: {
$ifNull: [
{
$arrayElemAt: ['$all_sms.total', 0],
},
0,
],
},
},
},
]);
reporting = reporting[0];

// Initialize response object
let resp = {
email: {
error: false,
optout: false,
suspend: false,
},
sms: {
error: false,
optout: false,
suspend: false,
},
};
let updObj = {};

// Evaluate email metrics
if (reporting?.all_email > THRESHOLD_EMAIL) {
const error_rate = reporting?.failed_email / reporting?.all_email;
const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;

if (error_rate > 0.08) {
resp.email.error = true;
if (error_rate > 0.125) {
resp.email.suspend = true;
}
}
if (opt_rate > 0.01) {
resp.email.optout = true;
if (opt_rate > 0.025) {
resp.email.suspend = true;
}
}
}

// Evaluate SMS metrics
if (reporting?.all_sms > THRESHOLD_SMS) {
const error_rate = reporting?.failed_sms / reporting?.all_sms;
const opt_rate = reporting?.unsubscribe_sms / reporting?.all_sms;

if (error_rate > 0.08) {
resp.sms.error = true;
if (error_rate > 0.125) {
resp.sms.suspend = true;
}
}
if (opt_rate > 0.01) {
resp.sms.optout = true;
if (opt_rate > 0.025) {
resp.sms.suspend = true;
}
}
}

// Enforce cooldown periods
if (resp.email.error || resp.email.optout) {
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
is_over = new Date(account.suspend.last_email_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.email.error = false;
resp.email.optout = false;
}
}
if (is_over) {
updObj['suspend.last_email_violation_at'] = Date.now();
}
}

if (resp.sms.error || resp.sms.optout) {
let is_over = true;
if (account?.suspend?.last_sms_violation_at) {
is_over = new Date(account.suspend.last_sms_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.sms.error = false;
resp.sms.optout = false;
}
}
if (is_over) {
updObj['suspend.last_sms_violation_at'] = Date.now();
}
}

if (resp.email.suspend) {
let is_over = true;
if (account?.suspend?.email_suspended_at) {
is_over = new Date(account.suspend.email_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.email.suspend = false;
}
}
if (is_over) {
updObj['suspend.email'] = resp.email.suspend;
updObj['suspend.email_suspended_at'] = Date.now();
}
}

if (resp.sms.suspend) {
let is_over = true;
if (account?.suspend?.sms_suspended_at) {
is_over = new Date(account.suspend.sms_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
resp.sms.suspend = false;
}
}
if (is_over) {
updObj['suspend.sms'] = resp.sms.suspend;
updObj['suspend.sms_suspended_at'] = Date.now();
}
}

// Update account if violations detected
if (Object.keys(updObj).length) {
await Account.findByIdAndUpdate(account_id, updObj);
}

// Send notification if any violations
if (
resp.email.error ||
resp.email.optout ||
resp.sms.error ||
resp.sms.optout ||
resp.email.suspend ||
resp.sms.suspend
) {
await axios.get(`${process.env.NOTIFICATION_SERVICE_URL}/run-service`, {
params: {
type: 'communication.violation',
'data[account_id]': account_id.toString(),
'data[email_error]': resp.email.error,
'data[email_optout]': resp.email.optout,
'data[sms_error]': resp.sms.error,
'data[sms_optout]': resp.sms.optout,
'data[sms_suspend]': resp.sms.suspend,
'data[email_suspend]': resp.email.suspend,
},
});
}

return done();
} catch (err) {
done(err);
}
};

const completedCb = async job => {};

let queue;

exports.start = async () => {
try {
if (!queue)
queue = QueueWrapper(`comm_error_calculation`, 'global', { processCb, completedCb });
return Promise.resolve(queue);
} catch (err) {
logger.error({
initiator: 'QM/communication/error-calculation',
error: err,
message: `Error while starting queue`,
});
}
};

๐Ÿ—„๏ธ Collections Usedโ€‹

communicationsโ€‹

  • Operations: Aggregate (faceted metrics), Count
  • Model: shared/models/communication.js
  • Usage Context: Track all email and SMS communications with delivery events

Key Fields:

  • _id: Communication record ID
  • account_id: Account owner reference
  • message_type: 'EMAIL' or 'SMS'
  • module: Source module (e.g., 'automation', 'campaigns')
  • sender_id: User who sent the message
  • contact_id: Array of contact references
  • conversation_id: Array of conversation references
  • events: Array of delivery events (strict: false allows dynamic fields)
    • Email events: 'dropped', 'deferred', 'bounce', 'unsubscribe', 'spamreport'
    • SMS events: 'MessageStatus' with values 'undelivered', 'failed', 'canceled'
  • body: Message body (used for SMS opt-out detection)
  • createdAt: Message creation timestamp (indexed for 7-day lookback)

_accountsโ€‹

  • Operations: Find, Update
  • Model: shared/models/account.js
  • Usage Context: Store suspension flags and violation timestamps

Key Fields (suspend object):

  • suspend.email: Boolean - email sending suspended
  • suspend.sms: Boolean - SMS sending suspended
  • suspend.last_email_violation_at: Timestamp - last email violation detected
  • suspend.last_sms_violation_at: Timestamp - last SMS violation detected
  • suspend.email_suspended_at: Timestamp - when email was suspended
  • suspend.sms_suspended_at: Timestamp - when SMS was suspended

๐Ÿ”ง Job Configurationโ€‹

Cron Scheduleโ€‹

'*/5 * * * * *'; // Every 5 seconds

Frequency: 720 times per hour

Rationale: Ultra-high frequency ensures near real-time violation detection and account suspension.

Timeframe Configurationโ€‹

const TIMEFRAME = 7 * 24 * 60 * 60 * 1000; // 7 days in milliseconds

7-Day Rolling Window: All calculations based on last 7 days of activity.

Volume Thresholdsโ€‹

const THRESHOLD_EMAIL = 50; // Minimum emails to trigger analysis
const THRESHOLD_SMS = 25; // Minimum SMS to trigger analysis

Rationale: Prevents false positives for low-volume senders.

Violation Thresholdsโ€‹

Email Error Rate:

  • Warning: > 8% (0.08) - Flag email.error = true
  • Suspension: > 12.5% (0.125) - Flag email.suspend = true

Email Opt-Out Rate:

  • Warning: > 1% (0.01) - Flag email.optout = true
  • Suspension: > 2.5% (0.025) - Flag email.suspend = true

SMS Error Rate:

  • Warning: > 8% (0.08) - Flag sms.error = true
  • Suspension: > 12.5% (0.125) - Flag sms.suspend = true

SMS Opt-Out Rate:

  • Warning: > 1% (0.01) - Flag sms.optout = true
  • Suspension: > 2.5% (0.025) - Flag sms.suspend = true

Queue Settingsโ€‹

QueueWrapper(`comm_error_calculation`, 'global', {
processCb,
completedCb,
});

Queue Name: comm_error_calculation

Concurrency: Default (1)

Retry Settings:

{
attempts: 4,
backoff: 4000, // 4-second delay between retries
}

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

1. Faceted Aggregationโ€‹

Purpose: Calculate 6 metrics in single query

Aggregation Pipeline:

Stage 1 - Match: Account communications in last 7 days

{
$match: {
account_id: new mongoose.Types.ObjectId(account_id),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
}

Stage 2 - Facet: Calculate all metrics in parallel

{
$facet: {
failed_email: [...], // Count dropped/deferred/bounce emails
unsubscribe_email: [...], // Count unsubscribe/spam report emails
all_email: [...], // Count total emails
failed_sms: [...], // Count undelivered/failed/canceled SMS
unsubscribe_sms: [...], // Count STOP/UNSUBSCRIBE SMS
all_sms: [...], // Count total SMS
},
}

Stage 3 - Set: Extract counts from arrays (default to 0)

{
$set: {
failed_email: {
$ifNull: [
{ $arrayElemAt: ['$failed_email.total', 0] },
0,
],
},
// ... same for other metrics
},
}

2. Rate Calculationโ€‹

Email Error Rate:

const error_rate = reporting?.failed_email / reporting?.all_email;
// Example: 10 failed / 100 total = 0.10 (10%)

Email Opt-Out Rate:

const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;
// Example: 2 unsubscribes / 100 total = 0.02 (2%)

SMS Error Rate:

const error_rate = reporting?.failed_sms / reporting?.all_sms;
// Example: 5 failed / 50 total = 0.10 (10%)

SMS Opt-Out Rate:

const opt_rate = reporting?.unsubscribe_sms / reporting?.all_sms;
// Example: 1 STOP / 50 total = 0.02 (2%)

3. Threshold Evaluationโ€‹

Email Evaluation Logic:

if (reporting?.all_email > THRESHOLD_EMAIL) {
// Must have >50 emails
const error_rate = reporting?.failed_email / reporting?.all_email;
const opt_rate = reporting?.unsubscribe_email / reporting?.all_email;

if (error_rate > 0.08) {
// >8% error rate
resp.email.error = true;
if (error_rate > 0.125) {
// >12.5% error rate
resp.email.suspend = true;
}
}
if (opt_rate > 0.01) {
// >1% opt-out rate
resp.email.optout = true;
if (opt_rate > 0.025) {
// >2.5% opt-out rate
resp.email.suspend = true;
}
}
}

SMS Evaluation Logic: Identical structure with SMS metrics.

4. Cooldown Period Enforcementโ€‹

Purpose: Prevent duplicate notifications within 7 days

Email Violation Cooldown:

if (resp.email.error || resp.email.optout) {
let is_over = true;
if (account?.suspend?.last_email_violation_at) {
// Check if 7 days have passed since last violation
is_over = new Date(account.suspend.last_email_violation_at + TIMEFRAME) < new Date();
if (!is_over) {
// Within cooldown - clear flags (no notification)
resp.email.error = false;
resp.email.optout = false;
}
}
if (is_over) {
// Update violation timestamp
updObj['suspend.last_email_violation_at'] = Date.now();
}
}

SMS Violation Cooldown: Identical structure with SMS fields.

Suspension Cooldown:

if (resp.email.suspend) {
let is_over = true;
if (account?.suspend?.email_suspended_at) {
// Check if 7 days have passed since suspension
is_over = new Date(account.suspend.email_suspended_at + TIMEFRAME) < new Date();
if (!is_over) {
// Still suspended - clear flag
resp.email.suspend = false;
}
}
if (is_over) {
// Apply new suspension
updObj['suspend.email'] = resp.email.suspend;
updObj['suspend.email_suspended_at'] = Date.now();
}
}

5. Account Updateโ€‹

Update Pattern:

if (Object.keys(updObj).length) {
await Account.findByIdAndUpdate(account_id, updObj);
}

Example Update Object:

{
'suspend.last_email_violation_at': 1697212800000,
'suspend.email': true,
'suspend.email_suspended_at': 1697212800000,
}

6. Notification Triggerโ€‹

Condition: Any violation flag is true

if (
resp.email.error ||
resp.email.optout ||
resp.sms.error ||
resp.sms.optout ||
resp.email.suspend ||
resp.sms.suspend
) {
// Send notification
}

Notification Service Call:

await axios.get(`${process.env.NOTIFICATION_SERVICE_URL}/run-service`, {
params: {
type: 'communication.violation',
'data[account_id]': account_id.toString(),
'data[email_error]': resp.email.error,
'data[email_optout]': resp.email.optout,
'data[sms_error]': resp.sms.error,
'data[sms_optout]': resp.sms.optout,
'data[sms_suspend]': resp.sms.suspend,
'data[email_suspend]': resp.email.suspend,
},
});

Notification Type: communication.violation

Notification Data: All violation flags as boolean parameters

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Aggregation Failureโ€‹

Scenario: MongoDB aggregation query fails (timeout, connection issue)

Handling: Error passed to done(err), job retries (4 attempts with 4-second backoff)

Impact: Account metrics not evaluated, retry on next attempt

Account Not Foundโ€‹

Scenario: Account deleted between service and queue processing

Handling: account variable is null, cooldown checks gracefully handle with optional chaining

Impact: No suspension history, violations treated as first-time

Notification Service Unavailableโ€‹

Scenario: Notification service down or unreachable

Handling: Axios call fails, error thrown, job retries

Impact: Violation detected but notification not sent, retry on next attempt

Division by Zeroโ€‹

Scenario: Total emails/SMS is zero (should be prevented by thresholds)

Handling: Not explicitly handled, would result in Infinity or NaN

Impact: Comparison with thresholds would fail, no violations detected

Failed Job Callbackโ€‹

Note: No explicit failedCb defined - relies on default Bull error handling.

Completed Job Callbackโ€‹

const completedCb = async job => {};

Action: No-op (empty function)

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

Service Level:

  • Accounts added to queue for import (console.log)

Queue Level:

  • No explicit success logging

Error Loggingโ€‹

Cron Level:

  • Error in cron initialization

Service Level:

  • Error aggregating accounts
  • Error queuing individual accounts (console.log)

Queue Level:

  • Error starting queue

Performance Metricsโ€‹

  • Typical Aggregation Time: 1-5 seconds per account (complex faceted query)
  • Accounts Processed Per Run: Varies (all accounts with 7-day activity)
  • High-Volume System: Could process 100-1000+ accounts per run
  • Total Job Time: 2-10 seconds per account

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Cron Schedule: Every 5 seconds (no external triggers)
  • Database Activity: Any communication creates potential for violation

External Dependenciesโ€‹

  • Notification Service: Sends violation alerts to admins
  • SendGrid: Email delivery events stored in communications
  • Twilio: SMS delivery events stored in communications

Jobs That Depend On Thisโ€‹

  • None: Standalone compliance monitoring
  • Email Campaigns: Suspended accounts cannot send campaigns
  • SMS Campaigns: Suspended accounts cannot send SMS
  • Automation: Suspended accounts have automations paused

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Account Suspension: Automatically blocks email/SMS sending
  • โš ๏ธ Service Disruption: Customer communication stops immediately
  • โš ๏ธ Notification Spam: High frequency could trigger many notifications

Performance Considerationsโ€‹

  • High Frequency: Every 5 seconds may cause DB load
  • Faceted Aggregation: Complex query on large communications collection
  • No Caching: Recalculates metrics on every run (no caching of rates)
  • Cooldown Period: 7-day cooldown prevents notification spam

Business Logicโ€‹

Why 7-Day Window?

  • Industry standard for compliance monitoring
  • Balances recent behavior with statistical significance
  • Aligns with SendGrid and Twilio reporting periods

Why Different Thresholds?

  • Email errors more common (spam filters, invalid addresses)
  • SMS errors less common (validated phone numbers)
  • Opt-out rates critical for compliance (CAN-SPAM, TCPA)

Why Every 5 Seconds?

  • Near real-time violation detection
  • Prevents continued abuse after threshold crossed
  • Minimizes customer complaints to providers

Why Cooldown Period?

  • Prevents notification spam
  • Allows time for account review and remediation
  • 7-day window matches evaluation timeframe

Maintenance Notesโ€‹

  • Thresholds: Hardcoded in queue file (consider environment variables)
  • Timeframe: 7 days hardcoded (consider configurable)
  • Volume Thresholds: 50 emails, 25 SMS hardcoded
  • Notification Type: 'communication.violation' hardcoded
  • Cron Frequency: Every 5 seconds (consider reducing for performance)

Code Quality Issuesโ€‹

Issue 1: Inconsistent Error Messages

console.log(
`Error occurred while processing the data for import.\nFile path: /queue-manager/services/accounts/import.js.\nError: ${err}`,
);

Issue: Copy-pasted error message references wrong file path (accounts/import.js instead of communication/errorReporting.js).

Suggestion: Fix error messages:

logger.error({
initiator: 'QM/communication/error-reporting',
message: 'Error occurred while aggregating accounts',
error: err,
});

Issue 2: Variable Shadowing

let is_over = true;
if (account?.suspend?.last_email_violation_at) {
const is_over = ... // Shadows outer variable
if (!is_over) {
resp.email.error = false;
}
}
if (is_over) { // Uses outer variable (unchanged)
updObj['suspend.last_email_violation_at'] = Date.now();
}

Issue: const is_over inside if-block shadows outer let is_over, causing logic error.

Suggestion: Remove inner const:

let is_over = true;
if (account?.suspend?.last_email_violation_at) {
is_over = ... // Update outer variable
if (!is_over) {
resp.email.error = false;
}
}

Issue 3: Duplicate Suspension Logic

if (opt_rate > 0.01) {
resp.email.optout = true;
if (error_rate > 0.025) {
// BUG: Should be opt_rate
resp.email.suspend = true;
}
}

Issue: Checking error_rate instead of opt_rate for opt-out suspension threshold.

Suggestion: Fix condition:

if (opt_rate > 0.01) {
resp.email.optout = true;
if (opt_rate > 0.025) {
// Corrected
resp.email.suspend = true;
}
}

Issue 4: Division by Zero Not Handled

const error_rate = reporting?.failed_email / reporting?.all_email;

Issue: If all_email is 0, results in Infinity.

Suggestion: Add defensive check (already exists with THRESHOLD_EMAIL).

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/communication/errorReporting

Simulate Email Violationsโ€‹

const Communication = require('./models/communication');

// Create 100 emails with 15 failures (15% error rate - above suspension threshold)
const communications = [];
for (let i = 0; i < 100; i++) {
const isFailed = i < 15;
communications.push({
account_id: accountId,
message_type: 'EMAIL',
module: 'test',
events: [
{
event: isFailed ? 'bounce' : 'delivered',
timestamp: Date.now(),
},
],
createdAt: new Date(),
});
}
await Communication.insertMany(communications);

console.log('Created 100 emails with 15% error rate (should trigger suspension)');

Verify Violation Detectionโ€‹

// Wait for next cron run (up to 5 seconds)
await new Promise(resolve => setTimeout(resolve, 6000));

// Check account suspension status
const account = await Account.findById(accountId);
console.log('Email suspended:', account.suspend.email); // Should be true
console.log('Suspension timestamp:', account.suspend.email_suspended_at);
console.log('Last violation:', account.suspend.last_email_violation_at);

Test Cooldown Periodโ€‹

// Set violation timestamp 8 days ago
await Account.findByIdAndUpdate(accountId, {
'suspend.last_email_violation_at': Date.now() - 8 * 24 * 60 * 60 * 1000,
});

// Create new violations
// ... (same as above)

// Wait for cron
await new Promise(resolve => setTimeout(resolve, 6000));

// Verify new notification sent
const account = await Account.findById(accountId);
const hoursSince = (Date.now() - account.suspend.last_email_violation_at) / (1000 * 60 * 60);
console.log('Hours since last violation:', hoursSince); // Should be ~0 (just updated)

Monitor Queue Processingโ€‹

# Watch logs during violation detection
tail -f logs/queue-manager.log | grep "communication"

# Expected outputs:
# "Accounts added to queue for import."
# (Notification service logs if violations detected)

Test Faceted Aggregationโ€‹

const mongoose = require('mongoose');
const Communication = require('./models/communication');

const TIMEFRAME = 7 * 24 * 60 * 60 * 1000;

const reporting = await Communication.aggregate([
{
$match: {
account_id: new mongoose.Types.ObjectId(accountId),
createdAt: {
$gte: new Date(Date.now() - TIMEFRAME),
},
},
},
{
$facet: {
failed_email: [
{
$match: {
message_type: 'EMAIL',
'events.event': {
$in: ['dropped', 'deferred', 'bounce'],
},
},
},
{
$count: 'total',
},
],
all_email: [
{
$match: {
message_type: 'EMAIL',
},
},
{
$count: 'total',
},
],
},
},
]);

console.log('Email metrics:', reporting[0]);
console.log('Error rate:', reporting[0].failed_email[0]?.total / reporting[0].all_email[0]?.total);

Job Type: High-Frequency Scheduled with Account-Level Queuing
Execution Frequency: Every 5 seconds
Average Duration: 2-10 seconds per account
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM