Skip to main content

Inactive Task Pulse Processor

Overview

The Inactive Task Pulse processor identifies open tasks (request or approval type) that have had no communication for 7+ days. It creates pulses to re-engage teams and prevent stalled work on active managed subscriptions.

Source File: queue-manager/services/projects/inactiveTaskPulse.js
Queue File: queue-manager/queues/projects/inactiveTaskPulse.js
Execution: Part of projects cron (every 5 minutes)
Business Impact: HIGH - Task engagement

Processing Flow

sequenceDiagram
participant CRON as Projects Cron
participant SVC as Inactive Task Service
participant TASKS as Projects Tasks
participant ORDERS as Store Orders
participant SUBS as Subscriptions
participant COMM as Communications
participant PULSE as Projects Pulse
participant QUEUE as Task Queue

CRON->>SVC: createInactiveTaskPulse()
SVC->>TASKS: Find open tasks<br/>(request/approval)
TASKS-->>SVC: Task list

SVC->>ORDERS: Lookup orders
SVC->>SUBS: Check subscription status<br/>(active/past_due only)
SVC->>COMM: Check last communication<br/>(>7 days or none)

SVC->>PULSE: Check existing pulse<br/>for this task

loop For Each Inactive Task
SVC->>QUEUE: Add to queue
QUEUE->>PULSE: Create pulse
end

Key Logic

Inactivity Threshold

sevenDaysAgo = Math.floor(Date.now() / 1000) - 7 * 24 * 60 * 60;

Task Filtering

Tasks must meet ALL criteria:

  • Status: NOT completed
  • Type: request OR approval
  • Removed: NOT true
  • Order: Exists and linked
  • Product Type: Managed subscriptions (excluding sites, listings, phone_number, software)
  • Subscription Status: active OR past_due
  • Last Communication: >7 days ago OR none exists
  • Existing Pulse: None for this task

Product Type Filter

const UNMANAGED_PRODUCTS = ['site', 'listings', 'phone_number', 'software'];
const VALID_MANAGED_SUBSCRIPTIONS = MANAGED_SUBSCRIPTIONS.filter(
sub => !UNMANAGED_PRODUCTS.includes(sub),
);

MongoDB Aggregation Pipeline

Stage 1: Match Open Tasks

{
$match: {
status: { $ne: 'completed' },
type: { $in: ['request', 'approval'] },
removed: { $ne: true }
}
}

Stage 2: Lookup Order

{
$lookup: {
from: '_store.orders',
localField: 'order_id',
foreignField: '_id',
as: 'order'
}
},
{
$addFields: {
order: { $arrayElemAt: ['$order', 0] }
}
}

Stage 3: Filter Managed Subscriptions

{
$match: {
order: { $exists: true, $ne: null },
'order.metadata.product_type': { $in: VALID_MANAGED_SUBSCRIPTIONS }
}
}

Stage 4: Lookup Active Subscription

{
$lookup: {
from: '_store.subscriptions',
let: { subscriptionId: '$order.subscription' },
pipeline: [
{
$match: {
$expr: { $eq: ['$_id', '$$subscriptionId'] },
status: { $in: ['active', 'past_due'] }
}
}
],
as: 'subscription'
}
}

Stage 5: Lookup Last Communication

{
$lookup: {
from: 'communications',
localField: 'last_communication_id',
foreignField: '_id',
as: 'last_communication'
}
}

Stage 6: Filter Inactive Communications

{
$match: {
$or: [
{ last_communication: { $exists: false } },
{ 'last_communication.createdAt': { $lt: new Date(sevenDaysAgo * 1000) } },
];
}
}

Stage 7: Check Existing Pulse

{
$lookup: {
from: 'projects.pulse',
let: { orderId: '$order_id', taskId: '$_id' },
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ['$order_id', '$$orderId'] },
{ $eq: ['$type', 'inactive_task'] },
{ $in: ['$$taskId', '$metadata.tasks'] }
]
}
}
}
],
as: 'existing_inactive_task'
}
},
{
$match: {
existing_inactive_task: { $size: 0 }
}
}

Queue Processing

Adding to Queue

const queue = await inactiveTaskPulse.start();

for (const inactivetask of pendingResponseTasks) {
await queue.add(inactivetask, {
removeOnComplete: true,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}

Queue Options

  • Remove on Complete: true
  • Attempts: 3
  • Backoff: Exponential with 2s delay

Pulse Structure

{
order_id: ObjectId,
type: 'inactive_task',
status: 'pending',
metadata: {
tasks: [taskId],
task_type: 'request',
last_communication_date: null,
inactive_days: 7
}
}

Collections Used

Input: projects-tasks

  • Queries open request/approval tasks
  • Joins with orders and subscriptions

Lookup: _store.orders

  • Product type verification

Lookup: _store.subscriptions

  • Active status verification

Lookup: communications

  • Last communication check

Check: projects-pulse

  • Prevents duplicate pulses for same task

Output: projects-pulse (via queue)

  • Creates inactive task pulses

Error Handling

try {
// Aggregation and queue processing
} catch (error) {
logger.error({
initiator: 'QM/projects/inactiveTaskPulse',
error: error,
});
throw error;
}

Performance

Execution Time: 3-8 seconds
Frequency: Every 5 minutes
Typical Results: 0-50 tasks per run
Peak: 100-200 tasks (Monday mornings)


Complexity: MEDIUM-HIGH
Lines of Code: 260

💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM