Inactive Task Pulse Processor
Overview
The Inactive Task Pulse processor identifies open tasks (request or approval type) that have had no communication for 7+ days. It creates pulses to re-engage teams and prevent stalled work on active managed subscriptions.
Source File: queue-manager/services/projects/inactiveTaskPulse.js
Queue File: queue-manager/queues/projects/inactiveTaskPulse.js
Execution: Part of projects cron (every 5 minutes)
Business Impact: HIGH - Task engagement
Processing Flow
sequenceDiagram
participant CRON as Projects Cron
participant SVC as Inactive Task Service
participant TASKS as Projects Tasks
participant ORDERS as Store Orders
participant SUBS as Subscriptions
participant COMM as Communications
participant PULSE as Projects Pulse
participant QUEUE as Task Queue
CRON->>SVC: createInactiveTaskPulse()
SVC->>TASKS: Find open tasks<br/>(request/approval)
TASKS-->>SVC: Task list
SVC->>ORDERS: Lookup orders
SVC->>SUBS: Check subscription status<br/>(active/past_due only)
SVC->>COMM: Check last communication<br/>(>7 days or none)
SVC->>PULSE: Check existing pulse<br/>for this task
loop For Each Inactive Task
SVC->>QUEUE: Add to queue
QUEUE->>PULSE: Create pulse
end
Key Logic
Inactivity Threshold
sevenDaysAgo = Math.floor(Date.now() / 1000) - 7 * 24 * 60 * 60;
Task Filtering
Tasks must meet ALL criteria:
- Status: NOT completed
- Type:
requestORapproval - Removed: NOT true
- Order: Exists and linked
- Product Type: Managed subscriptions (excluding sites, listings, phone_number, software)
- Subscription Status:
activeORpast_due - Last Communication: >7 days ago OR none exists
- Existing Pulse: None for this task
Product Type Filter
const UNMANAGED_PRODUCTS = ['site', 'listings', 'phone_number', 'software'];
const VALID_MANAGED_SUBSCRIPTIONS = MANAGED_SUBSCRIPTIONS.filter(
sub => !UNMANAGED_PRODUCTS.includes(sub),
);
MongoDB Aggregation Pipeline
Stage 1: Match Open Tasks
{
$match: {
status: { $ne: 'completed' },
type: { $in: ['request', 'approval'] },
removed: { $ne: true }
}
}
Stage 2: Lookup Order
{
$lookup: {
from: '_store.orders',
localField: 'order_id',
foreignField: '_id',
as: 'order'
}
},
{
$addFields: {
order: { $arrayElemAt: ['$order', 0] }
}
}
Stage 3: Filter Managed Subscriptions
{
$match: {
order: { $exists: true, $ne: null },
'order.metadata.product_type': { $in: VALID_MANAGED_SUBSCRIPTIONS }
}
}
Stage 4: Lookup Active Subscription
{
$lookup: {
from: '_store.subscriptions',
let: { subscriptionId: '$order.subscription' },
pipeline: [
{
$match: {
$expr: { $eq: ['$_id', '$$subscriptionId'] },
status: { $in: ['active', 'past_due'] }
}
}
],
as: 'subscription'
}
}
Stage 5: Lookup Last Communication
{
$lookup: {
from: 'communications',
localField: 'last_communication_id',
foreignField: '_id',
as: 'last_communication'
}
}
Stage 6: Filter Inactive Communications
{
$match: {
$or: [
{ last_communication: { $exists: false } },
{ 'last_communication.createdAt': { $lt: new Date(sevenDaysAgo * 1000) } },
];
}
}
Stage 7: Check Existing Pulse
{
$lookup: {
from: 'projects.pulse',
let: { orderId: '$order_id', taskId: '$_id' },
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ['$order_id', '$$orderId'] },
{ $eq: ['$type', 'inactive_task'] },
{ $in: ['$$taskId', '$metadata.tasks'] }
]
}
}
}
],
as: 'existing_inactive_task'
}
},
{
$match: {
existing_inactive_task: { $size: 0 }
}
}
Queue Processing
Adding to Queue
const queue = await inactiveTaskPulse.start();
for (const inactivetask of pendingResponseTasks) {
await queue.add(inactivetask, {
removeOnComplete: true,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
Queue Options
- Remove on Complete: true
- Attempts: 3
- Backoff: Exponential with 2s delay
Pulse Structure
{
order_id: ObjectId,
type: 'inactive_task',
status: 'pending',
metadata: {
tasks: [taskId],
task_type: 'request',
last_communication_date: null,
inactive_days: 7
}
}
Collections Used
Input: projects-tasks
- Queries open request/approval tasks
- Joins with orders and subscriptions
Lookup: _store.orders
- Product type verification
Lookup: _store.subscriptions
- Active status verification
Lookup: communications
- Last communication check
Check: projects-pulse
- Prevents duplicate pulses for same task
Output: projects-pulse (via queue)
- Creates inactive task pulses
Error Handling
try {
// Aggregation and queue processing
} catch (error) {
logger.error({
initiator: 'QM/projects/inactiveTaskPulse',
error: error,
});
throw error;
}
Performance
Execution Time: 3-8 seconds
Frequency: Every 5 minutes
Typical Results: 0-50 tasks per run
Peak: 100-200 tasks (Monday mornings)
Complexity: MEDIUM-HIGH
Lines of Code: 260