Skip to main content

Projects Triggers - Project Entity Synchronization

Overview

File: queue-manager/triggers/projects.js
Functions: 3 separate triggers for project entity synchronization
Purpose: Maintain task and pulse metadata consistency with order data and communication timestamps

This module contains three independent triggers that ensure projects tasks and pulses stay synchronized with their related order metadata and communication records.

Trigger 1: updateTaskWithOrderInfo()

Configuration

Watched Collection: Projects Tasks
Event: Insert only
Purpose: Copy order metadata to newly created tasks

Match Conditions

const matchConditions = {
operationType: 'insert',
};

Data Flow

sequenceDiagram
participant API as Internal API
participant TASK as Projects Tasks
participant TRIGGER as projects.js Trigger
participant ORDER as Orders Collection

API->>TASK: Create new task
TASK->>TRIGGER: Insert event fired
TRIGGER->>ORDER: Query order by order_id
ORDER-->>TRIGGER: Return order with metadata
TRIGGER->>TASK: Update task.order_metadata

Processing Logic

When a new task is inserted, immediately fetch and copy the order metadata:

exports.updateTaskWithOrderInfo = () => {
const matchConditions = {
operationType: 'insert',
};

startChangeStream(ProjectTask, matchConditions, async changeEvent => {
const fullDocument = changeEvent.data;
const order = await Order.findOne({ _id: fullDocument.order_id }).lean().exec();

if (order) {
await ProjectTask.updateOne(
{ _id: fullDocument._id },
{ $set: { order_metadata: order.metadata } },
);
}
});
};

Use Case

Scenario: Order manager assigns task to team member

Flow:

  1. Task created with order_id reference
  2. Trigger fires on insert
  3. Order queried for metadata
  4. Task updated with complete order metadata (seller, buyer, team)
  5. Task now has all context without additional queries

Collections Updated

  • Projects Tasks - order_metadata field (full copy of order.metadata)

Trigger 2: updatePulseWithOrderInfo()

Configuration

Watched Collection: Projects Pulses
Event: Insert only
Purpose: Copy order metadata to newly created pulses, or build from account/user data if no order

Match Conditions

const matchConditions = {
operationType: 'insert',
};

Data Flow

sequenceDiagram
participant SERVICE as Service/Cron
participant PULSE as Projects Pulses
participant TRIGGER as projects.js Trigger
participant ORDER as Orders Collection
participant ACCOUNT as Accounts Collection
participant USER as Users Collection

SERVICE->>PULSE: Create new pulse
PULSE->>TRIGGER: Insert event fired

alt Pulse has order_id
TRIGGER->>ORDER: Query order
ORDER-->>TRIGGER: Return order.metadata
TRIGGER->>PULSE: Set order_metadata from order
else No order_id
TRIGGER->>ACCOUNT: Query parent account
TRIGGER->>USER: Query owner users
TRIGGER->>USER: Query manager
ACCOUNT-->>TRIGGER: Return account data
USER-->>TRIGGER: Return users data
TRIGGER->>PULSE: Build metadata from account/users
end

Processing Logic

Path 1: Pulse with Order

If the pulse has an order_id, copy order metadata:

const order = await Order.findOne({ _id: fullDocument.order_id }).lean().exec();

if (order) {
await ProjectPulse.updateOne(
{ _id: fullDocument._id },
{ $set: { order_metadata: order.metadata } },
);
}

Path 2: Pulse without Order

If no order exists, build metadata from account and users:

else {
const accountProjection = {
id: '$_id',
_id: 0,
name: 1,
email: 1,
phone: 1,
image: '$logo',
manager: 1,
};

const userProjection = {
id: '$_id',
_id: 0,
name: 1,
email: 1,
phone: 1,
image: 1,
is_owner: 1,
account: 1,
};

const metadataUpdate = {};
const findQueries = [];

let mainAcc;

// Get main account if not in metadata
if (!fullDocument?.metadata?.main_account) {
mainAcc = await Account.findOne(
{ _id: fullDocument.parent_account },
accountProjection
).lean().exec();

if (mainAcc) {
metadataUpdate['metadata.main_account'] = mainAcc;
}
}

// Get responsible users (owners)
if (!fullDocument.metadata.responsible) {
findQueries.push(
User.find(
{ account: fullDocument.parent_account, is_owner: true },
userProjection
)
.lean()
.then(users => users && users.length > 0 && {
type: 'responsible',
data: users
})
);
}

// Get manager
if (!fullDocument.metadata.dashclicks_team?.manager) {
findQueries.push(
User.findOne({ _id: mainAcc.manager }, userProjection)
.lean()
.then(user => user && {
type: 'dashclicks_team.manager',
data: user
})
);
}

// Process queries
if (findQueries.length > 0) {
const results = await Promise.allSettled(findQueries);
results.forEach(result => {
if (result.status === 'fulfilled' && result.value) {
metadataUpdate[`metadata.${result.value.type}`] = result.value.data;
}
});
}

if (!Object.keys(metadataUpdate).length) return;

await ProjectPulse.updateOne(
{ _id: fullDocument._id },
{ $set: metadataUpdate }
);
}

Use Cases

Use Case 1: Order-Based Pulse

Scenario: Churn risk pulse created for active subscription

Flow:

  1. Cron creates pulse with order_id
  2. Trigger fires on insert
  3. Order metadata copied to pulse
  4. Pulse has complete team and account context

Use Case 2: Non-Order Pulse

Scenario: Quarterly check-in pulse for account without active orders

Flow:

  1. Cron creates pulse with parent_account only
  2. Trigger fires on insert
  3. Account and users queried
  4. Metadata built from account owner and manager
  5. Pulse has sufficient context for processing

Collections Updated

  • Projects Pulses - order_metadata field (from order) OR metadata fields (built from account/users)

Trigger 3: monitorLastCommunicationId()

Configuration

Watched Collection: Projects Tasks
Event: Update of last_communication_id field
Purpose: Sync communication timestamp to task

Match Conditions

const matchConditions = {
operationType: 'update',
'updateDescription.updatedFields.last_communication_id': { $exists: true },
};

Data Flow

sequenceDiagram
participant API as Internal API
participant TASK as Projects Tasks
participant TRIGGER as projects.js Trigger
participant COMM as Communications

API->>TASK: Update last_communication_id
TASK->>TRIGGER: Update event fired
TRIGGER->>COMM: Query communication by _id
COMM-->>TRIGGER: Return communication.updatedAt
TRIGGER->>TASK: Set last_communication_time

Processing Logic

When last_communication_id changes, fetch the communication timestamp:

exports.monitorLastCommunicationId = () => {
const matchConditions = {
operationType: 'update',
'updateDescription.updatedFields.last_communication_id': { $exists: true },
};

startChangeStream(ProjectTask, matchConditions, async changeEvent => {
const taskId = changeEvent.data._id;
const lastCommunicationId = changeEvent.updatedFields.last_communication_id;

// Fetch the communication document to get updatedAt
const communication = await Communication.findOne({ _id: lastCommunicationId }).lean().exec();

if (communication && communication.updatedAt) {
// Update the ProjectTask with last_communication_time
await ProjectTask.updateOne(
{ _id: taskId },
{ $set: { last_communication_time: communication.updatedAt } },
);
}
});
};

Use Case

Scenario: Team member sends email to client via conversation system

Flow:

  1. Email sent and stored in Communications collection
  2. Task updated with last_communication_id pointing to email
  3. Trigger fires on field update
  4. Communication document queried for timestamp
  5. Task's last_communication_time updated
  6. Last contact time now visible on task without joining collections

Collections Updated

  • Projects Tasks - last_communication_time field (Date from communication.updatedAt)

Why This Matters

The last_communication_time field enables:

  • Inactive Task Detection: Identify tasks with no recent communication
  • Response Time Metrics: Calculate time between communications
  • Client Engagement Tracking: Monitor communication frequency
  • Dashboard Displays: Show "Last Contact" without complex queries

Trigger Comparison

TriggerWatched EventComplexityPurpose
updateTaskWithOrderInfoTask insertSimpleCopy order metadata
updatePulseWithOrderInfoPulse insertMediumCopy metadata or build from scratch
monitorLastCommunicationIdTask field updateSimpleSync communication timestamp

Performance Considerations

Optimizations

  1. Insert-Only Watching: Two triggers only watch inserts (lower event volume)
  2. Specific Field Watch: Communication trigger only fires on one field change
  3. Lean Queries: All queries use .lean() for raw JSON
  4. Early Exit: Pulse trigger exits early if no metadata updates needed
  5. Parallel Queries: Pulse trigger uses Promise.allSettled() for user queries

Query Volume

Task Insert:

  • 1 order query
  • 1 task update
  • Total: 2 operations

Pulse Insert (with order):

  • 1 order query
  • 1 pulse update
  • Total: 2 operations

Pulse Insert (without order):

  • 1 account query
  • 1-2 user queries (parallel)
  • 1 pulse update
  • Total: 3-4 operations

Communication Update:

  • 1 communication query
  • 1 task update
  • Total: 2 operations

Error Handling

All three triggers use the common change stream error handler:

stream.on('change', async data => {
try {
await handleChange({
/* ... */
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
}
});

No explicit error handling within trigger functions - errors bubble up to change stream handler.

Upstream Triggers

  • order.js: Order metadata built here is copied by task/pulse triggers
  • users.js: User updates propagate through order metadata to tasks/pulses
  • contacts.js: Contact updates propagate through order metadata

Downstream Effects

  • Tasks/pulses with updated metadata may trigger:
    • Dashboard updates
    • Notification generation
    • Analytics calculations

Metadata Inheritance Chain

graph LR
CONTACT[Contacts] -->|contacts.js| ACCOUNT[Accounts]
USER[Users] -->|users.js| ACCOUNT

ACCOUNT -->|order.js| ORDER[Orders]
USER -->|order.js| ORDER

ORDER -->|projects.js: updateTaskWithOrderInfo| TASK[Tasks]
ORDER -->|projects.js: updatePulseWithOrderInfo| PULSE[Pulses]

ACCOUNT -.->|projects.js: updatePulseWithOrderInfo| PULSE
USER -.->|projects.js: updatePulseWithOrderInfo| PULSE

style ORDER fill:#f9f,stroke:#333
style TASK fill:#9ff,stroke:#333
style PULSE fill:#9ff,stroke:#333

Key:

  • Solid lines: Primary metadata flow
  • Dashed lines: Fallback metadata flow (when no order)

Monitoring

Key Metrics

  • Task Insert Rate: Tasks created per minute
  • Pulse Insert Rate: Pulses created per minute
  • Order Match Rate: % of pulses with vs without orders
  • Communication Update Rate: Frequency of last_communication_id changes

Logging

Currently uses implicit logging from common/changeStream.js:

logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});

Best Practices

1. Insert-Only Watching

Only watching inserts prevents update loops:

const matchConditions = {
operationType: 'insert',
};

This ensures triggers only fire once per entity creation.

2. Null-Safe Queries

Check for existence before querying:

if (communication && communication.updatedAt) {
await ProjectTask.updateOne(/* ... */);
}

3. Early Exit for No-Op Updates

if (!Object.keys(metadataUpdate).length) return;

Avoids unnecessary database updates when no changes needed.

4. Parallel Query Execution

const results = await Promise.allSettled(findQueries);

Executes independent queries concurrently for performance.

Use Case: Complete Flow

Scenario: New Client Order with Task Assignment

Step-by-Step:

  1. Order Created (Internal API)

    • Seller, buyer, manager, specialist assigned
    • order.js trigger fires
    • Order metadata built with all team members
  2. Task Created (Projects System)

    • Task assigned with order_id reference
    • updateTaskWithOrderInfo() trigger fires
    • Order metadata copied to task
  3. Email Sent (Conversation System)

    • Communication record created
    • Task's last_communication_id updated
    • monitorLastCommunicationId() trigger fires
    • last_communication_time set on task
  4. Pulse Created (Cron Job)

    • Inactive task pulse generated
    • updatePulseWithOrderInfo() trigger fires
    • Order metadata copied to pulse

Result: All entities (order, task, pulse) have consistent metadata without manual synchronization.

Summary

The projects triggers provide three specialized synchronization mechanisms: task metadata inheritance from orders, pulse metadata building (with fallback logic), and communication timestamp tracking. These triggers ensure that project entities maintain consistent context information, enabling efficient querying and display without complex joins.

Key Characteristics:

  • 📋 Three independent triggers for different purposes
  • ⚡ Insert-only watching for tasks and pulses
  • 🔄 Fallback metadata building for non-order pulses
  • ⏱️ Communication timestamp synchronization
  • 🎯 Simple, focused functionality per trigger
  • 🔗 Part of larger metadata inheritance chain
💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM