Skip to main content

Triggers System Overview

Introduction

The Triggers System is a real-time event-driven architecture in Queue Manager that uses MongoDB Change Streams to automatically respond to database changes. Unlike cron-based jobs that run on schedules, triggers execute immediately when data changes occur.

Location: queue-manager/triggers/

Architecture Pattern

Core Concept: Event-Driven Synchronization

Triggers provide real-time data synchronization across collections without manual intervention or scheduled jobs.

graph TB
subgraph "External Sources"
API[Internal API]
USER[User Actions]
EXT[External Integrations]
end

subgraph "MongoDB Collections"
CONTACT[Contacts]
USER_COL[Users]
ORDER[Orders]
ACCOUNT[Accounts]
TASK[Projects Tasks]
PULSE[Projects Pulses]
FUNNEL[Funnel Steps]
end

subgraph "Change Stream Infrastructure"
STREAM[MongoDB Change Stream]
MONITOR[changeStream.js]
end

subgraph "Trigger Handlers"
TRIG_INDEX[triggers/index.js]
CONTACT_T[contacts.js]
USER_T[users.js]
ORDER_T[order.js]
ACCOUNT_T[accounts.js]
PROJECT_T[projects.js]
FUNNEL_T[funnels.js]
end

API --> CONTACT
USER --> USER_COL
EXT --> ORDER

CONTACT --> STREAM
USER_COL --> STREAM
ORDER --> STREAM
ACCOUNT --> STREAM
TASK --> STREAM
PULSE --> STREAM
FUNNEL --> STREAM

STREAM --> MONITOR

MONITOR --> TRIG_INDEX
TRIG_INDEX --> CONTACT_T
TRIG_INDEX --> USER_T
TRIG_INDEX --> ORDER_T
TRIG_INDEX --> ACCOUNT_T
TRIG_INDEX --> PROJECT_T
TRIG_INDEX --> FUNNEL_T

CONTACT_T --> ACCOUNT
CONTACT_T --> ORDER
CONTACT_T --> TASK
USER_T --> ACCOUNT
USER_T --> CONTACT
USER_T --> ORDER
ORDER_T --> ACCOUNT
ORDER_T --> TASK
ORDER_T --> PULSE
ACCOUNT_T --> PULSE
PROJECT_T --> PULSE
PROJECT_T --> TASK
FUNNEL_T --> FUNNEL

Trigger Modules

1. contacts.js - CRM Synchronization

Purpose: Syncs contact changes to accounts, orders, tasks, and sites

Trigger: syncAccountWithCRM()

  • Watches: Contacts collection
  • Events: Insert, Update (name, email, phone, address, social, website, image)
  • Propagates To:
    • Account (name, email, phone, address, social, website, logo)
    • Orders (seller/buyer metadata)
    • Projects Tasks (order_metadata)
    • Projects Pulses (order_metadata)
    • Users (owner email)
    • Agency Websites (business_details_updated flag)
    • Instasites (business_details_updated flag)

Key Logic:

  • Maintains data consistency across 7+ collections
  • Updates metadata fields in orders and subscriptions
  • Handles main account vs sub-account distinctions
  • Syncs website/instasite business details

2. users.js - User Account Sync

Purpose: Syncs user profile changes to accounts and orders

Trigger: syncUserWithAccount()

  • Watches: Users collection
  • Events: Insert, Update (name, email, phone, image)
  • Propagates To:
    • Account (owner object for is_owner: true users)
    • Contacts (business contact email)
    • Orders (manager, specialist, responsible metadata)
    • Projects Tasks (order_metadata)

Key Logic:

  • Owner users update account owner details
  • Syncs owner email to business contact
  • Updates order metadata for managers/specialists
  • Rebuilds responsible arrays when users change

3. order.js - Order Metadata Management

Purpose: Maintains comprehensive order metadata and account relationships

Trigger: updateAccountsWithOrderInfo()

  • Watches: Orders collection
  • Events: Insert, Update (seller, buyer, assigned_user_ids, manager, specialist, managers[], specialists[], metadata fields)
  • Propagates To:
    • Accounts (has_orders flag, became_customer_on date)
    • Stripe Subscriptions (manager field)
    • Projects Tasks (order_metadata)
    • Projects Pulses (order_metadata)

Key Logic:

  • Sets has_orders: true on accounts
  • Calculates became_customer_on from oldest subscription
  • Builds comprehensive metadata object with:
    • Seller/buyer account info
    • Responsible users array
    • DashClicks team (manager, specialist, content_specialist)
    • Managers/specialists arrays with start/end dates
  • Syncs metadata to tasks and pulses

4. projects.js - Project Entity Sync

Purpose: Maintains project tasks and pulses with order/account metadata

Three Triggers:

A. updateTaskWithOrderInfo()

  • Watches: Projects Tasks (insert)
  • Copies: Order metadata to new tasks

B. updatePulseWithOrderInfo()

  • Watches: Projects Pulses (insert)
  • Copies: Order metadata or builds from account/user data if no order

C. monitorLastCommunicationId()

  • Watches: Projects Tasks (last_communication_id updates)
  • Copies: Communication timestamp to last_communication_time

5. accounts.js - Account Manager Tracking

Purpose: Updates project pulses when account manager changes

Trigger: monitorAccountManagerChange()

  • Watches: Accounts collection (manager field updates)
  • Propagates To: Projects Pulses (metadata.dashclicks_team.manager)
  • Updates: 6 pulse types with new manager info

Pulse Types Affected:

  • account_churn_risk
  • new_subscription_inquiry
  • quarterly_checkin
  • schedule_onboarding
  • loyalty_program_milestone
  • review_request

6. funnels.js - Funnel Thumbnail Generation

Purpose: Flags funnel steps for thumbnail regeneration

Trigger: buildFunnelThumbnails()

  • Watches: Funnel Steps (raw_html changes)
  • Sets Flags:
    • step_changed: true
    • Unsets: processing_queued, thumbnail_build_in_progress, thumbnail_process_started_at
  • Downstream: Cron job picks up flagged steps for processing

Change Stream Infrastructure

Core Module: common/changeStream.js

const startChangeStream = (model, matchConditions, handleChange) => {
const stream = model.watch([{ $match: matchConditions }], { fullDocument: 'updateLookup' });

stream.on('change', async data => {
await handleChange({
data: data.fullDocument,
op: data.operationType,
updatedFields: data?.updateDescription?.updatedFields,
});
});
};

Features:

  • fullDocument: 'updateLookup': Retrieves complete document after update
  • Match Conditions: Filters events using MongoDB aggregation pipeline
  • Error Handling: Logs errors without crashing stream
  • Persistent: Streams remain open for lifetime of QM process

Initialization

Startup Sequence

// queue-manager/triggers/index.js
exports.start = () => {
try {
syncAccountWithCRM(); // contacts.js
syncUserWithAccount(); // users.js
updateAccountsWithOrderInfo(); // order.js
updatePulseWithOrderInfo(); // projects.js
updateTaskWithOrderInfo(); // projects.js
monitorLastCommunicationId(); // projects.js
monitorAccountManagerChange(); // accounts.js
buildFunnelThumbnails(); // funnels.js
} catch (err) {
logger.error({ initiator: 'QM/triggers', error: err });
}
};

Called from main queue-manager/index.js during startup.

Match Condition Patterns

Pattern 1: Insert or Specific Field Updates

const matchConditions = {
$or: [
{ operationType: { $in: ['insert', 'replace'] } },
{
$and: [
{ operationType: 'update' },
{
$or: [
{ 'updateDescription.updatedFields.name': { $exists: true } },
{ 'updateDescription.updatedFields.email': { $exists: true } },
],
},
],
},
],
};

Used By: contacts.js, users.js

Pattern 2: Insert Only

const matchConditions = {
operationType: 'insert',
};

Used By: projects.js (task/pulse creation)

Pattern 3: Specific Field Update Only

const matchConditions = {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.manager': { $exists: true } },
],
};

Used By: accounts.js (manager changes)

Pattern 4: Complex Multi-Field Logic

const matchConditions = {
$or: [
{ operationType: 'insert' },
{
$and: [
{ operationType: 'update' },
{
$or: [
{ 'updateDescription.updatedFields.seller_account': { $exists: true } },
{ 'updateDescription.updatedFields.trigger_metadata_sync': { $exists: true } },
{
$expr: {
$ne: [
{
$type: {
$getField: {
field: 'metadata.content_specialist',
input: '$updateDescription.updatedFields',
},
},
},
'missing',
],
},
},
],
},
],
},
],
};

Used By: order.js (comprehensive order tracking)

Correlation with Queue Manager

Relationship with Cron Jobs

Triggers complement cron jobs but serve different purposes:

AspectTriggersCron Jobs
TimingImmediate (event-driven)Scheduled (time-driven)
PurposeData synchronizationBusiness logic processing
ScaleSingle recordBatch processing
LatencyMillisecondsMinutes to hours
Use CaseKeep metadata in syncGenerate reports, send emails

Integration Points

1. Trigger → Flag → Cron Pattern

Example: Funnel thumbnails

sequenceDiagram
participant USER as User
participant FUNNEL as Funnel Steps
participant TRIGGER as Trigger: funnels.js
participant CRON as Cron: funnels.js
participant QUEUE as Queue: build-thumbnail.js

USER->>FUNNEL: Update raw_html
FUNNEL->>TRIGGER: Change stream event
TRIGGER->>FUNNEL: Set step_changed: true

Note over CRON: */5 * * * * *
CRON->>FUNNEL: Query step_changed: true
CRON->>QUEUE: Add thumbnail job
QUEUE->>FUNNEL: Generate thumbnail
FUNNEL->>FUNNEL: Clear step_changed flag

2. Trigger → Direct Update Pattern

Example: Contact updates

sequenceDiagram
participant API as Internal API
participant CONTACT as Contacts
participant TRIGGER as Trigger: contacts.js
participant ACCOUNT as Accounts
participant ORDER as Orders
participant TASK as Tasks

API->>CONTACT: Update contact email
CONTACT->>TRIGGER: Change stream event
TRIGGER->>ACCOUNT: Update email
TRIGGER->>ORDER: Update metadata.seller.email
TRIGGER->>TASK: Update order_metadata.seller.email

Note over TRIGGER,TASK: All updates happen immediately

Data Flow Examples

Example 1: Order Creation Cascade

1. New Order Created (Internal API)

2. order.js Trigger Fires

3. Parallel Actions:
- Set Account.has_orders = true
- Set Account.became_customer_on (if first order)
- Build metadata object (seller, buyer, manager, specialist)
- Update order with metadata

4. Task Created for Order

5. projects.js Trigger Fires (updateTaskWithOrderInfo)

6. Copy order.metadata → task.order_metadata

Example 2: Contact Email Change Cascade

1. Contact Email Updated (CRM)

2. contacts.js Trigger Fires

3. Parallel Updates:
- Account email updated
- Owner user email updated (if owner)
- All orders metadata updated (seller/buyer)
- All tasks order_metadata updated
- All pulses order_metadata updated
- Sites flagged for business_details_updated

Performance Considerations

Efficient Query Patterns

1. Parallel Queries with Promise.allSettled()

const findQueries = [
Account.findOne({ _id: sellerId }, projection).lean(),
Account.findOne({ _id: buyerId }, projection).lean(),
User.find({ _id: { $in: userIds } }, projection).lean(),
];

const results = await Promise.allSettled(findQueries);

2. Lean Queries (No Mongoose Overhead)

const account = await Account.findOne({ _id: accountId })
.select('name email phone logo')
.lean()
.exec();

3. Early Exit for Irrelevant Updates

if (op === 'update' && Object.keys(updates).length === 1 && updates.updated_at) {
return; // Skip if only timestamp changed
}

Avoiding Infinite Loops

Triggers can potentially create update loops. Mitigation strategies:

  1. Check for Meaningful Changes: Only update if data actually differs
  2. Use Flags: trigger_metadata_sync flag prevents re-triggering
  3. Unset Flags: Clear flags after processing ($unset: { trigger_metadata_sync: '' })
  4. Match Conditions: Only watch specific fields, not all updates

Error Handling

Stream-Level Error Handling

stream.on('change', async data => {
try {
await handleChange({
/* ... */
});
} catch (error) {
logger.error({
initiator: 'QM/common/change-stream',
error,
data,
});
// Stream continues despite errors
}
});

Handler-Level Error Handling

try {
await Account.updateOne({ _id: accountId }, { $set: updatePayload });
} catch (error) {
logger.error({
initiator: 'queue-manager/triggers/contacts',
error,
message: 'Error updating account with CRM data',
});
// Logs error but doesn't throw (prevents stream crash)
}

Monitoring and Debugging

Logging Strategy

logger.log({
initiator: 'queue-manager/triggers/order',
message: 'Processing order change event',
changeEvent: data,
});

Key Metrics to Monitor

  1. Change Stream Lag: Time between DB change and trigger execution
  2. Error Rate: Frequency of errors in trigger handlers
  3. Update Volume: Number of cascading updates per trigger event
  4. Query Performance: Time spent in database queries

Configuration

Environment Variables

None directly used by triggers (always active).

Triggers run whenever Queue Manager starts, regardless of QM_* flags.

Disabling Triggers

To disable triggers, modify queue-manager/triggers/index.js:

exports.start = () => {
try {
// Comment out specific triggers to disable
// syncAccountWithCRM();
syncUserWithAccount();
updateAccountsWithOrderInfo();
// ... etc
} catch (err) {
logger.error({ initiator: 'QM/triggers', error: err });
}
};

Collections Involved

Source Collections (Watched)

  • Contacts - CRM contact changes
  • Users - User profile changes
  • Orders - Order creation and updates
  • Accounts - Account manager changes
  • Projects Tasks - Task creation and communication updates
  • Projects Pulses - Pulse creation
  • Funnel Steps - Funnel HTML changes

Target Collections (Updated)

  • Accounts - Metadata and flags
  • Orders - Comprehensive metadata
  • Stripe Subscriptions - Manager field
  • Projects Tasks - Order metadata
  • Projects Pulses - Order/account metadata
  • Contacts - Email sync
  • Agency Websites - Update flags
  • Instasites - Update flags

Use Cases

1. Real-Time CRM Sync

Scenario: Sales rep updates client phone number in CRM

Trigger Flow:

  1. Contact phone updated
  2. syncAccountWithCRM() fires
  3. Account phone updated immediately
  4. All orders updated with new metadata
  5. All tasks/pulses reflect new phone number

2. Team Member Assignment

Scenario: Manager assigned to order

Trigger Flow:

  1. Order manager field updated
  2. updateAccountsWithOrderInfo() fires
  3. User details fetched
  4. Order metadata.dashclicks_team.manager updated
  5. Tasks/pulses get updated metadata
  6. Stripe subscription manager updated

3. Account Churn Risk Updates

Scenario: Account manager changes

Trigger Flow:

  1. Account manager updated
  2. monitorAccountManagerChange() fires
  3. All related pulses updated with new manager
  4. Churn risk notifications go to correct manager

4. Funnel Content Changes

Scenario: User edits funnel page HTML

Trigger Flow:

  1. Funnel step raw_html updated
  2. buildFunnelThumbnails() fires
  3. step_changed: true flag set
  4. Cron job picks up flagged step
  5. Thumbnail regenerated

Best Practices

1. Match Conditions Specificity

Good: Watch specific fields that matter

{ 'updateDescription.updatedFields.email': { $exists: true } }

Bad: Watch all updates indiscriminately

{
operationType: 'update';
}

2. Lean Queries

Good: Use lean() for read-only data

Account.findOne({ _id }).select('name email').lean();

Bad: Load full Mongoose documents

Account.findOne({ _id });

3. Parallel Execution

Good: Run independent queries in parallel

await Promise.allSettled([query1, query2, query3]);

Bad: Sequential queries

await query1;
await query2;
await query3;

4. Error Isolation

Good: Catch errors at handler level

try {
await update();
} catch (err) {
logger.error(err);
}

Bad: Let errors crash the stream

await update(); // Uncaught error kills stream

Future Enhancements

Potential Improvements

  1. Retry Logic: Add retry mechanism for failed updates
  2. Batch Updates: Group multiple related updates into transactions
  3. Performance Metrics: Track trigger execution times
  4. Circuit Breaker: Pause triggers during high-load periods
  5. Selective Activation: Environment flags for individual triggers
  6. Change Stream Resume Tokens: Handle stream disconnections gracefully
  • Architecture (documentation unavailable) - Overall Queue Manager patterns
  • Orders Module (link removed - file does not exist) - Order processing details
  • Projects Module - Task and pulse management
  • Accounts Module - Account lifecycle

Summary

The Triggers System provides real-time, event-driven data synchronization across DashClicks collections. By leveraging MongoDB Change Streams, triggers ensure metadata consistency without manual intervention or scheduled jobs, enabling immediate propagation of changes throughout the system.

Key Characteristics:

  • Immediate: Millisecond response to data changes
  • 🔄 Cascading: Single change updates multiple collections
  • 🎯 Targeted: Specific field watches for efficiency
  • 🛡️ Resilient: Error handling prevents stream crashes
  • 🔗 Integrated: Complements cron-based processing
💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM