Triggers System Overview
Introduction
The Triggers System is a real-time event-driven architecture in Queue Manager that uses MongoDB Change Streams to automatically respond to database changes. Unlike cron-based jobs that run on schedules, triggers execute immediately when data changes occur.
Location: queue-manager/triggers/
Architecture Pattern
Core Concept: Event-Driven Synchronization
Triggers provide real-time data synchronization across collections without manual intervention or scheduled jobs.
graph TB
subgraph "External Sources"
API[Internal API]
USER[User Actions]
EXT[External Integrations]
end
subgraph "MongoDB Collections"
CONTACT[Contacts]
USER_COL[Users]
ORDER[Orders]
ACCOUNT[Accounts]
TASK[Projects Tasks]
PULSE[Projects Pulses]
FUNNEL[Funnel Steps]
end
subgraph "Change Stream Infrastructure"
STREAM[MongoDB Change Stream]
MONITOR[changeStream.js]
end
subgraph "Trigger Handlers"
TRIG_INDEX[triggers/index.js]
CONTACT_T[contacts.js]
USER_T[users.js]
ORDER_T[order.js]
ACCOUNT_T[accounts.js]
PROJECT_T[projects.js]
FUNNEL_T[funnels.js]
end
API --> CONTACT
USER --> USER_COL
EXT --> ORDER
CONTACT --> STREAM
USER_COL --> STREAM
ORDER --> STREAM
ACCOUNT --> STREAM
TASK --> STREAM
PULSE --> STREAM
FUNNEL --> STREAM
STREAM --> MONITOR
MONITOR --> TRIG_INDEX
TRIG_INDEX --> CONTACT_T
TRIG_INDEX --> USER_T
TRIG_INDEX --> ORDER_T
TRIG_INDEX --> ACCOUNT_T
TRIG_INDEX --> PROJECT_T
TRIG_INDEX --> FUNNEL_T
CONTACT_T --> ACCOUNT
CONTACT_T --> ORDER
CONTACT_T --> TASK
USER_T --> ACCOUNT
USER_T --> CONTACT
USER_T --> ORDER
ORDER_T --> ACCOUNT
ORDER_T --> TASK
ORDER_T --> PULSE
ACCOUNT_T --> PULSE
PROJECT_T --> PULSE
PROJECT_T --> TASK
FUNNEL_T --> FUNNEL
Trigger Modules
1. contacts.js - CRM Synchronization
Purpose: Syncs contact changes to accounts, orders, tasks, and sites
Trigger: syncAccountWithCRM()
- Watches: Contacts collection
- Events: Insert, Update (name, email, phone, address, social, website, image)
- Propagates To:
- Account (name, email, phone, address, social, website, logo)
- Orders (seller/buyer metadata)
- Projects Tasks (order_metadata)
- Projects Pulses (order_metadata)
- Users (owner email)
- Agency Websites (business_details_updated flag)
- Instasites (business_details_updated flag)
Key Logic:
- Maintains data consistency across 7+ collections
- Updates metadata fields in orders and subscriptions
- Handles main account vs sub-account distinctions
- Syncs website/instasite business details
2. users.js - User Account Sync
Purpose: Syncs user profile changes to accounts and orders
Trigger: syncUserWithAccount()
- Watches: Users collection
- Events: Insert, Update (name, email, phone, image)
- Propagates To:
- Account (owner object for
is_owner: trueusers) - Contacts (business contact email)
- Orders (manager, specialist, responsible metadata)
- Projects Tasks (order_metadata)
- Account (owner object for
Key Logic:
- Owner users update account owner details
- Syncs owner email to business contact
- Updates order metadata for managers/specialists
- Rebuilds responsible arrays when users change
3. order.js - Order Metadata Management
Purpose: Maintains comprehensive order metadata and account relationships
Trigger: updateAccountsWithOrderInfo()
- Watches: Orders collection
- Events: Insert, Update (seller, buyer, assigned_user_ids, manager, specialist, managers[], specialists[], metadata fields)
- Propagates To:
- Accounts (has_orders flag, became_customer_on date)
- Stripe Subscriptions (manager field)
- Projects Tasks (order_metadata)
- Projects Pulses (order_metadata)
Key Logic:
- Sets
has_orders: trueon accounts - Calculates
became_customer_onfrom oldest subscription - Builds comprehensive metadata object with:
- Seller/buyer account info
- Responsible users array
- DashClicks team (manager, specialist, content_specialist)
- Managers/specialists arrays with start/end dates
- Syncs metadata to tasks and pulses
4. projects.js - Project Entity Sync
Purpose: Maintains project tasks and pulses with order/account metadata
Three Triggers:
A. updateTaskWithOrderInfo()
- Watches: Projects Tasks (insert)
- Copies: Order metadata to new tasks
B. updatePulseWithOrderInfo()
- Watches: Projects Pulses (insert)
- Copies: Order metadata or builds from account/user data if no order
C. monitorLastCommunicationId()
- Watches: Projects Tasks (last_communication_id updates)
- Copies: Communication timestamp to
last_communication_time
5. accounts.js - Account Manager Tracking
Purpose: Updates project pulses when account manager changes
Trigger: monitorAccountManagerChange()
- Watches: Accounts collection (manager field updates)
- Propagates To: Projects Pulses (metadata.dashclicks_team.manager)
- Updates: 6 pulse types with new manager info
Pulse Types Affected:
- account_churn_risk
- new_subscription_inquiry
- quarterly_checkin
- schedule_onboarding
- loyalty_program_milestone
- review_request
6. funnels.js - Funnel Thumbnail Generation
Purpose: Flags funnel steps for thumbnail regeneration
Trigger: buildFunnelThumbnails()
- Watches: Funnel Steps (raw_html changes)
- Sets Flags:
step_changed: true- Unsets:
processing_queued,thumbnail_build_in_progress,thumbnail_process_started_at
- Downstream: Cron job picks up flagged steps for processing
Change Stream Infrastructure
Core Module: common/changeStream.js
const startChangeStream = (model, matchConditions, handleChange) => {
const stream = model.watch([{ $match: matchConditions }], { fullDocument: 'updateLookup' });
stream.on('change', async data => {
await handleChange({
data: data.fullDocument,
op: data.operationType,
updatedFields: data?.updateDescription?.updatedFields,
});
});
};
Features:
- fullDocument: 'updateLookup': Retrieves complete document after update
- Match Conditions: Filters events using MongoDB aggregation pipeline
- Error Handling: Logs errors without crashing stream
- Persistent: Streams remain open for lifetime of QM process
Initialization
Startup Sequence
// queue-manager/triggers/index.js
exports.start = () => {
try {
syncAccountWithCRM(); // contacts.js
syncUserWithAccount(); // users.js
updateAccountsWithOrderInfo(); // order.js
updatePulseWithOrderInfo(); // projects.js
updateTaskWithOrderInfo(); // projects.js
monitorLastCommunicationId(); // projects.js
monitorAccountManagerChange(); // accounts.js
buildFunnelThumbnails(); // funnels.js
} catch (err) {
logger.error({ initiator: 'QM/triggers', error: err });
}
};
Called from main queue-manager/index.js during startup.
Match Condition Patterns
Pattern 1: Insert or Specific Field Updates
const matchConditions = {
$or: [
{ operationType: { $in: ['insert', 'replace'] } },
{
$and: [
{ operationType: 'update' },
{
$or: [
{ 'updateDescription.updatedFields.name': { $exists: true } },
{ 'updateDescription.updatedFields.email': { $exists: true } },
],
},
],
},
],
};
Used By: contacts.js, users.js
Pattern 2: Insert Only
const matchConditions = {
operationType: 'insert',
};
Used By: projects.js (task/pulse creation)
Pattern 3: Specific Field Update Only
const matchConditions = {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.manager': { $exists: true } },
],
};
Used By: accounts.js (manager changes)
Pattern 4: Complex Multi-Field Logic
const matchConditions = {
$or: [
{ operationType: 'insert' },
{
$and: [
{ operationType: 'update' },
{
$or: [
{ 'updateDescription.updatedFields.seller_account': { $exists: true } },
{ 'updateDescription.updatedFields.trigger_metadata_sync': { $exists: true } },
{
$expr: {
$ne: [
{
$type: {
$getField: {
field: 'metadata.content_specialist',
input: '$updateDescription.updatedFields',
},
},
},
'missing',
],
},
},
],
},
],
},
],
};
Used By: order.js (comprehensive order tracking)
Correlation with Queue Manager
Relationship with Cron Jobs
Triggers complement cron jobs but serve different purposes:
| Aspect | Triggers | Cron Jobs |
|---|---|---|
| Timing | Immediate (event-driven) | Scheduled (time-driven) |
| Purpose | Data synchronization | Business logic processing |
| Scale | Single record | Batch processing |
| Latency | Milliseconds | Minutes to hours |
| Use Case | Keep metadata in sync | Generate reports, send emails |
Integration Points
1. Trigger → Flag → Cron Pattern
Example: Funnel thumbnails
sequenceDiagram
participant USER as User
participant FUNNEL as Funnel Steps
participant TRIGGER as Trigger: funnels.js
participant CRON as Cron: funnels.js
participant QUEUE as Queue: build-thumbnail.js
USER->>FUNNEL: Update raw_html
FUNNEL->>TRIGGER: Change stream event
TRIGGER->>FUNNEL: Set step_changed: true
Note over CRON: */5 * * * * *
CRON->>FUNNEL: Query step_changed: true
CRON->>QUEUE: Add thumbnail job
QUEUE->>FUNNEL: Generate thumbnail
FUNNEL->>FUNNEL: Clear step_changed flag
2. Trigger → Direct Update Pattern
Example: Contact updates
sequenceDiagram
participant API as Internal API
participant CONTACT as Contacts
participant TRIGGER as Trigger: contacts.js
participant ACCOUNT as Accounts
participant ORDER as Orders
participant TASK as Tasks
API->>CONTACT: Update contact email
CONTACT->>TRIGGER: Change stream event
TRIGGER->>ACCOUNT: Update email
TRIGGER->>ORDER: Update metadata.seller.email
TRIGGER->>TASK: Update order_metadata.seller.email
Note over TRIGGER,TASK: All updates happen immediately
Data Flow Examples
Example 1: Order Creation Cascade
1. New Order Created (Internal API)
↓
2. order.js Trigger Fires
↓
3. Parallel Actions:
- Set Account.has_orders = true
- Set Account.became_customer_on (if first order)
- Build metadata object (seller, buyer, manager, specialist)
- Update order with metadata
↓
4. Task Created for Order
↓
5. projects.js Trigger Fires (updateTaskWithOrderInfo)
↓
6. Copy order.metadata → task.order_metadata
Example 2: Contact Email Change Cascade
1. Contact Email Updated (CRM)
↓
2. contacts.js Trigger Fires
↓
3. Parallel Updates:
- Account email updated
- Owner user email updated (if owner)
- All orders metadata updated (seller/buyer)
- All tasks order_metadata updated
- All pulses order_metadata updated
- Sites flagged for business_details_updated
Performance Considerations
Efficient Query Patterns
1. Parallel Queries with Promise.allSettled()
const findQueries = [
Account.findOne({ _id: sellerId }, projection).lean(),
Account.findOne({ _id: buyerId }, projection).lean(),
User.find({ _id: { $in: userIds } }, projection).lean(),
];
const results = await Promise.allSettled(findQueries);
2. Lean Queries (No Mongoose Overhead)
const account = await Account.findOne({ _id: accountId })
.select('name email phone logo')
.lean()
.exec();
3. Early Exit for Irrelevant Updates
if (op === 'update' && Object.keys(updates).length === 1 && updates.updated_at) {
return; // Skip if only timestamp changed
}
Avoiding Infinite Loops
Triggers can potentially create update loops. Mitigation strategies:
- Check for Meaningful Changes: Only update if data actually differs
- Use Flags:
trigger_metadata_syncflag prevents re-triggering - Unset Flags: Clear flags after processing (
$unset: { trigger_metadata_sync: '' }) - Match Conditions: Only watch specific fields, not all updates
Error Handling
Stream-Level Error Handling
stream.on('change', async data => {
try {
await handleChange({
/* ... */
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
// Stream continues despite errors
}
});
Handler-Level Error Handling
try {
await Account.updateOne({ _id: accountId }, { $set: updatePayload });
} catch (error) {
logger.error({
initiator: 'queue-manager/triggers/contacts',
error,
message: 'Error updating account with CRM data',
});
// Logs error but doesn't throw (prevents stream crash)
}
Monitoring and Debugging
Logging Strategy
logger.log({
initiator: 'queue-manager/triggers/order',
message: 'Processing order change event',
changeEvent: data,
});
Key Metrics to Monitor
- Change Stream Lag: Time between DB change and trigger execution
- Error Rate: Frequency of errors in trigger handlers
- Update Volume: Number of cascading updates per trigger event
- Query Performance: Time spent in database queries
Configuration
Environment Variables
None directly used by triggers (always active).
Triggers run whenever Queue Manager starts, regardless of QM_* flags.
Disabling Triggers
To disable triggers, modify queue-manager/triggers/index.js:
exports.start = () => {
try {
// Comment out specific triggers to disable
// syncAccountWithCRM();
syncUserWithAccount();
updateAccountsWithOrderInfo();
// ... etc
} catch (err) {
logger.error({ initiator: 'QM/triggers', error: err });
}
};
Collections Involved
Source Collections (Watched)
- Contacts - CRM contact changes
- Users - User profile changes
- Orders - Order creation and updates
- Accounts - Account manager changes
- Projects Tasks - Task creation and communication updates
- Projects Pulses - Pulse creation
- Funnel Steps - Funnel HTML changes
Target Collections (Updated)
- Accounts - Metadata and flags
- Orders - Comprehensive metadata
- Stripe Subscriptions - Manager field
- Projects Tasks - Order metadata
- Projects Pulses - Order/account metadata
- Contacts - Email sync
- Agency Websites - Update flags
- Instasites - Update flags
Use Cases
1. Real-Time CRM Sync
Scenario: Sales rep updates client phone number in CRM
Trigger Flow:
- Contact phone updated
syncAccountWithCRM()fires- Account phone updated immediately
- All orders updated with new metadata
- All tasks/pulses reflect new phone number
2. Team Member Assignment
Scenario: Manager assigned to order
Trigger Flow:
- Order manager field updated
updateAccountsWithOrderInfo()fires- User details fetched
- Order metadata.dashclicks_team.manager updated
- Tasks/pulses get updated metadata
- Stripe subscription manager updated
3. Account Churn Risk Updates
Scenario: Account manager changes
Trigger Flow:
- Account manager updated
monitorAccountManagerChange()fires- All related pulses updated with new manager
- Churn risk notifications go to correct manager
4. Funnel Content Changes
Scenario: User edits funnel page HTML
Trigger Flow:
- Funnel step raw_html updated
buildFunnelThumbnails()firesstep_changed: trueflag set- Cron job picks up flagged step
- Thumbnail regenerated
Best Practices
1. Match Conditions Specificity
✅ Good: Watch specific fields that matter
{ 'updateDescription.updatedFields.email': { $exists: true } }
❌ Bad: Watch all updates indiscriminately
{
operationType: 'update';
}
2. Lean Queries
✅ Good: Use lean() for read-only data
Account.findOne({ _id }).select('name email').lean();
❌ Bad: Load full Mongoose documents
Account.findOne({ _id });
3. Parallel Execution
✅ Good: Run independent queries in parallel
await Promise.allSettled([query1, query2, query3]);
❌ Bad: Sequential queries
await query1;
await query2;
await query3;
4. Error Isolation
✅ Good: Catch errors at handler level
try {
await update();
} catch (err) {
logger.error(err);
}
❌ Bad: Let errors crash the stream
await update(); // Uncaught error kills stream
Future Enhancements
Potential Improvements
- Retry Logic: Add retry mechanism for failed updates
- Batch Updates: Group multiple related updates into transactions
- Performance Metrics: Track trigger execution times
- Circuit Breaker: Pause triggers during high-load periods
- Selective Activation: Environment flags for individual triggers
- Change Stream Resume Tokens: Handle stream disconnections gracefully
Related Documentation
- Architecture (documentation unavailable) - Overall Queue Manager patterns
- Orders Module (link removed - file does not exist) - Order processing details
- Projects Module - Task and pulse management
- Accounts Module - Account lifecycle
Summary
The Triggers System provides real-time, event-driven data synchronization across DashClicks collections. By leveraging MongoDB Change Streams, triggers ensure metadata consistency without manual intervention or scheduled jobs, enabling immediate propagation of changes throughout the system.
Key Characteristics:
- ⚡ Immediate: Millisecond response to data changes
- 🔄 Cascading: Single change updates multiple collections
- 🎯 Targeted: Specific field watches for efficiency
- 🛡️ Resilient: Error handling prevents stream crashes
- 🔗 Integrated: Complements cron-based processing