Cleanup Pulses Processor
Overview
The Cleanup Pulses processor maintains data integrity by removing churn risk pulses when the risk ratio falls below 50%. It recalculates risk for all pending churn risk pulses and deletes those no longer meeting the threshold.
Source File: queue-manager/services/projects/cleanupPulses.js
Execution: Part of projects cron (every 5 minutes)
Type: Data maintenance
Business Impact: MEDIUM - Data accuracy
Processing Flow
sequenceDiagram
participant CRON as Projects Cron
participant SVC as Cleanup Service
participant PULSE as Projects Pulse
participant SUB as Subscriptions
CRON->>SVC: cleanupPulses()
SVC->>PULSE: Find pending churn risk pulses
PULSE-->>SVC: Pulse list
loop For Each Pulse
SVC->>SUB: Recalculate churn risk<br/>(canceled / total)
SUB-->>SVC: Risk ratio
alt Risk < 50%
SVC->>PULSE: Add to deletion list
end
end
SVC->>PULSE: Delete all below-threshold pulses
Key Logic
Churn Risk Recalculation
// For each pending churn risk pulse:
1. Get all subscriptions for account (active + canceled in last 30 days)
2. Calculate total_cost = sum of all subscription amounts
3. Calculate canceled_amount = sum of canceled managed subscription amounts
4. Calculate risk_ratio = canceled_amount / total_cost
5. If risk_ratio < 0.5, delete pulse
Time Window
thirtyDaysAgoSeconds = Math.floor(Date.now() / 1000) - 30 * 24 * 60 * 60;
Matches the same window used in churn risk calculation.
Subscription Filtering
const managedProductTypes = MANAGED_SUBSCRIPTIONS.filter(
sub => !['site', 'listings'].includes(sub),
);
Only managed subscriptions count toward canceled amount.
MongoDB Aggregation Pipeline
Stage 1: Match Pending Churn Risk Pulses
{
$match: {
type: 'account_churn_risk',
status: 'pending'
}
}
Stage 2: Lookup Account Subscriptions
{
$lookup: {
from: '_store.subscriptions',
let: { acc_id: '$account_id' },
pipeline: [
{
$match: {
$expr: { $eq: ['$account', '$$acc_id'] },
$or: [
{ status: 'active' },
{ canceled_at: { $gt: thirtyDaysAgoSeconds } }
]
}
},
{
$group: {
_id: null,
total_cost: { $sum: '$plan.amount' },
canceled_amount: {
$sum: {
$cond: [
{
$and: [
{ $eq: ['$status', 'canceled'] },
{ $in: ['$plan.metadata.product_type', managedProductTypes] }
]
},
'$plan.amount',
0
]
}
}
}
}
],
as: 'risk_window'
}
}
Stage 3: Calculate Current Risk
{
$addFields: {
has_risk: {
$cond: [
{ $gt: [{ $first: '$risk_window.total_cost' }, 0] },
{
$divide: [
{ $first: '$risk_window.canceled_amount' },
{ $first: '$risk_window.total_cost' },
],
},
0,
];
}
}
}
Stage 4: Filter Below Threshold
{
$match: {
has_risk: {
$lt: 0.5;
}
}
}
Stage 5: Collect IDs to Delete
{
$group: {
_id: null,
ids: { $addToSet: '$_id' }
}
}
Deletion Operation
await Pulses.deleteMany({
_id: { $in: pulsesIds },
});
Collections Used
Input: projects-pulse
- Queries pending churn risk pulses
Lookup: _store.subscriptions
- Recalculates current risk ratio
Output: projects-pulse
- Deletes stale pulses
Use Cases
Example 1: Customer Reactivated
// Initial State
Pulse: { type: 'account_churn_risk', risk: 0.65 }
Canceled: $650, Total: $1000
// Customer adds new subscription
Active: $500 added
Total: $1500
// New Risk
risk = 650 / 1500 = 0.43 (43%)
// Action
Pulse deleted (below 50% threshold)
Example 2: Cancellation Expired
// Initial State
Pulse: { type: 'account_churn_risk', risk: 0.55 }
Canceled: $550 (28 days ago), Total: $1000
// After 3 Days (31 days total)
Canceled subscription no longer in 30-day window
Total: $450 (only active subscriptions)
Canceled: $0 (outside window)
// New Risk
risk = 0 / 450 = 0 (0%)
// Action
Pulse deleted (below 50% threshold)
Example 3: Still At Risk
// State
Pulse: { type: 'account_churn_risk', risk: 0.75 }
Canceled: $750, Total: $1000
// Recalculation
risk = 750 / 1000 = 0.75 (75%)
// Action
Pulse kept (above 50% threshold)
Error Handling
try {
// Aggregation and deletion
} catch (error) {
logger.error({
initiator: 'QM/projects/cleanupPulses',
error: error,
});
throw error;
}
Return Value
return pulsesIds; // Array of deleted pulse ObjectIds
Logging
// No pulses to cleanup
'No pulse to cleanup (no churn risk pulses fell below threshold)' // Pulses deleted
`Removed ${pulsesIds.length} churn risk pulses now below threshold`;
Business Value
Why Cleanup?
- Data Accuracy: Removes false positives
- User Experience: Prevents obsolete notifications
- Resource Optimization: Reduces noise for customer success teams
- System Performance: Keeps pulse collection lean
Why 50% Threshold?
- Matches creation threshold for consistency
- Prevents pulse flickering (create/delete loops)
- Clear business rule for sales teams
Why Check Every 5 Minutes?
- Rapid response to customer reactivation
- Removes stale alerts quickly
- Low overhead (only queries pending pulses)
Performance
Execution Time: 1-3 seconds
Frequency: Every 5 minutes
Typical Results: 0-5 pulses deleted per run
Peak: 10-20 pulses (after bulk reactivation campaigns)
Complexity: MEDIUM
Lines of Code: 113
Type: Maintenance