Projects Triggers - Project Entity Synchronization
Overview
File: queue-manager/triggers/projects.js
Functions: 3 separate triggers for project entity synchronization
Purpose: Maintain task and pulse metadata consistency with order data and communication timestamps
This module contains three independent triggers that ensure projects tasks and pulses stay synchronized with their related order metadata and communication records.
Trigger 1: updateTaskWithOrderInfo()
Configuration
Watched Collection: Projects Tasks
Event: Insert only
Purpose: Copy order metadata to newly created tasks
Match Conditions
const matchConditions = {
operationType: 'insert',
};
Data Flow
sequenceDiagram
participant API as Internal API
participant TASK as Projects Tasks
participant TRIGGER as projects.js Trigger
participant ORDER as Orders Collection
API->>TASK: Create new task
TASK->>TRIGGER: Insert event fired
TRIGGER->>ORDER: Query order by order_id
ORDER-->>TRIGGER: Return order with metadata
TRIGGER->>TASK: Update task.order_metadata
Processing Logic
When a new task is inserted, immediately fetch and copy the order metadata:
exports.updateTaskWithOrderInfo = () => {
const matchConditions = {
operationType: 'insert',
};
startChangeStream(ProjectTask, matchConditions, async changeEvent => {
const fullDocument = changeEvent.data;
const order = await Order.findOne({ _id: fullDocument.order_id }).lean().exec();
if (order) {
await ProjectTask.updateOne(
{ _id: fullDocument._id },
{ $set: { order_metadata: order.metadata } },
);
}
});
};
Use Case
Scenario: Order manager assigns task to team member
Flow:
- Task created with
order_idreference - Trigger fires on insert
- Order queried for metadata
- Task updated with complete order metadata (seller, buyer, team)
- Task now has all context without additional queries
Collections Updated
- Projects Tasks -
order_metadatafield (full copy of order.metadata)
Trigger 2: updatePulseWithOrderInfo()
Configuration
Watched Collection: Projects Pulses
Event: Insert only
Purpose: Copy order metadata to newly created pulses, or build from account/user data if no order
Match Conditions
const matchConditions = {
operationType: 'insert',
};
Data Flow
sequenceDiagram
participant SERVICE as Service/Cron
participant PULSE as Projects Pulses
participant TRIGGER as projects.js Trigger
participant ORDER as Orders Collection
participant ACCOUNT as Accounts Collection
participant USER as Users Collection
SERVICE->>PULSE: Create new pulse
PULSE->>TRIGGER: Insert event fired
alt Pulse has order_id
TRIGGER->>ORDER: Query order
ORDER-->>TRIGGER: Return order.metadata
TRIGGER->>PULSE: Set order_metadata from order
else No order_id
TRIGGER->>ACCOUNT: Query parent account
TRIGGER->>USER: Query owner users
TRIGGER->>USER: Query manager
ACCOUNT-->>TRIGGER: Return account data
USER-->>TRIGGER: Return users data
TRIGGER->>PULSE: Build metadata from account/users
end
Processing Logic
Path 1: Pulse with Order
If the pulse has an order_id, copy order metadata:
const order = await Order.findOne({ _id: fullDocument.order_id }).lean().exec();
if (order) {
await ProjectPulse.updateOne(
{ _id: fullDocument._id },
{ $set: { order_metadata: order.metadata } },
);
}
Path 2: Pulse without Order
If no order exists, build metadata from account and users:
else {
const accountProjection = {
id: '$_id',
_id: 0,
name: 1,
email: 1,
phone: 1,
image: '$logo',
manager: 1,
};
const userProjection = {
id: '$_id',
_id: 0,
name: 1,
email: 1,
phone: 1,
image: 1,
is_owner: 1,
account: 1,
};
const metadataUpdate = {};
const findQueries = [];
let mainAcc;
// Get main account if not in metadata
if (!fullDocument?.metadata?.main_account) {
mainAcc = await Account.findOne(
{ _id: fullDocument.parent_account },
accountProjection
).lean().exec();
if (mainAcc) {
metadataUpdate['metadata.main_account'] = mainAcc;
}
}
// Get responsible users (owners)
if (!fullDocument.metadata.responsible) {
findQueries.push(
User.find(
{ account: fullDocument.parent_account, is_owner: true },
userProjection
)
.lean()
.then(users => users && users.length > 0 && {
type: 'responsible',
data: users
})
);
}
// Get manager
if (!fullDocument.metadata.dashclicks_team?.manager) {
findQueries.push(
User.findOne({ _id: mainAcc.manager }, userProjection)
.lean()
.then(user => user && {
type: 'dashclicks_team.manager',
data: user
})
);
}
// Process queries
if (findQueries.length > 0) {
const results = await Promise.allSettled(findQueries);
results.forEach(result => {
if (result.status === 'fulfilled' && result.value) {
metadataUpdate[`metadata.${result.value.type}`] = result.value.data;
}
});
}
if (!Object.keys(metadataUpdate).length) return;
await ProjectPulse.updateOne(
{ _id: fullDocument._id },
{ $set: metadataUpdate }
);
}
Use Cases
Use Case 1: Order-Based Pulse
Scenario: Churn risk pulse created for active subscription
Flow:
- Cron creates pulse with
order_id - Trigger fires on insert
- Order metadata copied to pulse
- Pulse has complete team and account context
Use Case 2: Non-Order Pulse
Scenario: Quarterly check-in pulse for account without active orders
Flow:
- Cron creates pulse with
parent_accountonly - Trigger fires on insert
- Account and users queried
- Metadata built from account owner and manager
- Pulse has sufficient context for processing
Collections Updated
- Projects Pulses -
order_metadatafield (from order) ORmetadatafields (built from account/users)
Trigger 3: monitorLastCommunicationId()
Configuration
Watched Collection: Projects Tasks
Event: Update of last_communication_id field
Purpose: Sync communication timestamp to task
Match Conditions
const matchConditions = {
operationType: 'update',
'updateDescription.updatedFields.last_communication_id': { $exists: true },
};
Data Flow
sequenceDiagram
participant API as Internal API
participant TASK as Projects Tasks
participant TRIGGER as projects.js Trigger
participant COMM as Communications
API->>TASK: Update last_communication_id
TASK->>TRIGGER: Update event fired
TRIGGER->>COMM: Query communication by _id
COMM-->>TRIGGER: Return communication.updatedAt
TRIGGER->>TASK: Set last_communication_time
Processing Logic
When last_communication_id changes, fetch the communication timestamp:
exports.monitorLastCommunicationId = () => {
const matchConditions = {
operationType: 'update',
'updateDescription.updatedFields.last_communication_id': { $exists: true },
};
startChangeStream(ProjectTask, matchConditions, async changeEvent => {
const taskId = changeEvent.data._id;
const lastCommunicationId = changeEvent.updatedFields.last_communication_id;
// Fetch the communication document to get updatedAt
const communication = await Communication.findOne({ _id: lastCommunicationId }).lean().exec();
if (communication && communication.updatedAt) {
// Update the ProjectTask with last_communication_time
await ProjectTask.updateOne(
{ _id: taskId },
{ $set: { last_communication_time: communication.updatedAt } },
);
}
});
};
Use Case
Scenario: Team member sends email to client via conversation system
Flow:
- Email sent and stored in Communications collection
- Task updated with
last_communication_idpointing to email - Trigger fires on field update
- Communication document queried for timestamp
- Task's
last_communication_timeupdated - Last contact time now visible on task without joining collections
Collections Updated
- Projects Tasks -
last_communication_timefield (Date from communication.updatedAt)
Why This Matters
The last_communication_time field enables:
- Inactive Task Detection: Identify tasks with no recent communication
- Response Time Metrics: Calculate time between communications
- Client Engagement Tracking: Monitor communication frequency
- Dashboard Displays: Show "Last Contact" without complex queries
Trigger Comparison
| Trigger | Watched Event | Complexity | Purpose |
|---|---|---|---|
| updateTaskWithOrderInfo | Task insert | Simple | Copy order metadata |
| updatePulseWithOrderInfo | Pulse insert | Medium | Copy metadata or build from scratch |
| monitorLastCommunicationId | Task field update | Simple | Sync communication timestamp |
Performance Considerations
Optimizations
- Insert-Only Watching: Two triggers only watch inserts (lower event volume)
- Specific Field Watch: Communication trigger only fires on one field change
- Lean Queries: All queries use
.lean()for raw JSON - Early Exit: Pulse trigger exits early if no metadata updates needed
- Parallel Queries: Pulse trigger uses
Promise.allSettled()for user queries
Query Volume
Task Insert:
- 1 order query
- 1 task update
- Total: 2 operations
Pulse Insert (with order):
- 1 order query
- 1 pulse update
- Total: 2 operations
Pulse Insert (without order):
- 1 account query
- 1-2 user queries (parallel)
- 1 pulse update
- Total: 3-4 operations
Communication Update:
- 1 communication query
- 1 task update
- Total: 2 operations
Error Handling
All three triggers use the common change stream error handler:
stream.on('change', async data => {
try {
await handleChange({
/* ... */
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
}
});
No explicit error handling within trigger functions - errors bubble up to change stream handler.
Related Triggers
Upstream Triggers
- order.js: Order metadata built here is copied by task/pulse triggers
- users.js: User updates propagate through order metadata to tasks/pulses
- contacts.js: Contact updates propagate through order metadata
Downstream Effects
- Tasks/pulses with updated metadata may trigger:
- Dashboard updates
- Notification generation
- Analytics calculations
Metadata Inheritance Chain
graph LR
CONTACT[Contacts] -->|contacts.js| ACCOUNT[Accounts]
USER[Users] -->|users.js| ACCOUNT
ACCOUNT -->|order.js| ORDER[Orders]
USER -->|order.js| ORDER
ORDER -->|projects.js: updateTaskWithOrderInfo| TASK[Tasks]
ORDER -->|projects.js: updatePulseWithOrderInfo| PULSE[Pulses]
ACCOUNT -.->|projects.js: updatePulseWithOrderInfo| PULSE
USER -.->|projects.js: updatePulseWithOrderInfo| PULSE
style ORDER fill:#f9f,stroke:#333
style TASK fill:#9ff,stroke:#333
style PULSE fill:#9ff,stroke:#333
Key:
- Solid lines: Primary metadata flow
- Dashed lines: Fallback metadata flow (when no order)
Monitoring
Key Metrics
- Task Insert Rate: Tasks created per minute
- Pulse Insert Rate: Pulses created per minute
- Order Match Rate: % of pulses with vs without orders
- Communication Update Rate: Frequency of last_communication_id changes
Logging
Currently uses implicit logging from common/changeStream.js:
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
Best Practices
1. Insert-Only Watching
Only watching inserts prevents update loops:
const matchConditions = {
operationType: 'insert',
};
This ensures triggers only fire once per entity creation.
2. Null-Safe Queries
Check for existence before querying:
if (communication && communication.updatedAt) {
await ProjectTask.updateOne(/* ... */);
}
3. Early Exit for No-Op Updates
if (!Object.keys(metadataUpdate).length) return;
Avoids unnecessary database updates when no changes needed.
4. Parallel Query Execution
const results = await Promise.allSettled(findQueries);
Executes independent queries concurrently for performance.
Use Case: Complete Flow
Scenario: New Client Order with Task Assignment
Step-by-Step:
-
Order Created (Internal API)
- Seller, buyer, manager, specialist assigned
order.jstrigger fires- Order metadata built with all team members
-
Task Created (Projects System)
- Task assigned with
order_idreference updateTaskWithOrderInfo()trigger fires- Order metadata copied to task
- Task assigned with
-
Email Sent (Conversation System)
- Communication record created
- Task's
last_communication_idupdated monitorLastCommunicationId()trigger fireslast_communication_timeset on task
-
Pulse Created (Cron Job)
- Inactive task pulse generated
updatePulseWithOrderInfo()trigger fires- Order metadata copied to pulse
Result: All entities (order, task, pulse) have consistent metadata without manual synchronization.
Summary
The projects triggers provide three specialized synchronization mechanisms: task metadata inheritance from orders, pulse metadata building (with fallback logic), and communication timestamp tracking. These triggers ensure that project entities maintain consistent context information, enabling efficient querying and display without complex joins.
Key Characteristics:
- 📋 Three independent triggers for different purposes
- ⚡ Insert-only watching for tasks and pulses
- 🔄 Fallback metadata building for non-order pulses
- ⏱️ Communication timestamp synchronization
- 🎯 Simple, focused functionality per trigger
- 🔗 Part of larger metadata inheritance chain