Skip to main content

๐Ÿ”„ Change Stream Utility

๐Ÿ“– Overviewโ€‹

The changeStream.js utility provides a simplified wrapper around MongoDB's Change Streams API. It enables real-time monitoring of document changes with automatic error handling, fullDocument lookups, and consistent event handling patterns. Used by Queue Manager's stream processors for immediate response to database changes.

Source File: queue-manager/common/changeStream.js

๐ŸŽฏ Purposeโ€‹

  • Real-time Monitoring: Watch for document insert/update/delete operations
  • Filtered Watching: Apply match conditions to watch specific changes
  • Full Document Retrieval: Automatically fetch complete updated documents
  • Error Handling: Consistent error logging for stream failures
  • Event Processing: Route change events to custom handlers

๐Ÿ“˜ Function Signatureโ€‹

startChangeStream(model, matchConditions, handleChange)โ€‹

const startChangeStream = (model, matchConditions, handleChange) => {
// Implementation
};

Parametersโ€‹

  • model (Mongoose.Model) - Mongoose model to watch
  • matchConditions (Array<Object>) - MongoDB aggregation pipeline match stages
  • handleChange (Function) - Callback to handle change events
    • Receives: change (MongoDB change event object)
    • Return: void or Promise<void>

Returnsโ€‹

  • void - Starts the change stream (non-blocking)

๐Ÿ”ง Implementation Detailsโ€‹

Complete Source Codeโ€‹

const { logger } = require('../utilities');

/**
* Starts a MongoDB change stream with error handling
* @param {Mongoose.Model} model - Mongoose model to watch
* @param {Array} matchConditions - Match conditions for filtering changes
* @param {Function} handleChange - Function to handle each change event
*/
const startChangeStream = (model, matchConditions, handleChange) => {
const changeStream = model.watch(matchConditions, { fullDocument: 'updateLookup' });

changeStream.on('change', change => {
handleChange(change);
});

changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
});
};

module.exports = {
startChangeStream,
};

๐Ÿ“‹ Detailed Component Breakdownโ€‹

MongoDB Change Stream Creationโ€‹

const changeStream = model.watch(matchConditions, { fullDocument: 'updateLookup' });

Components:

  1. model.watch(): Mongoose wrapper for MongoDB's Change Streams API
  2. matchConditions: Aggregation pipeline to filter events
  3. fullDocument: 'updateLookup': Ensures complete document is returned

fullDocument: 'updateLookup' Behaviorโ€‹

Operation TypeDefault BehaviorWith 'updateLookup'
insertReturns full documentReturns full document
updateReturns only delta (changed fields)Performs additional query to return full document
replaceReturns full documentReturns full document
deleteNo documentNo document

Why 'updateLookup'?

// Without 'updateLookup'
{
operationType: 'update',
updateDescription: {
updatedFields: { status: 'completed' }
}
// Only delta - missing other fields!
}

// With 'updateLookup'
{
operationType: 'update',
fullDocument: {
_id: ObjectId,
status: 'completed', // Updated field
account_id: ObjectId, // All other fields included
user_id: ObjectId,
createdAt: Date
}
}

Performance Cost:

  • Additional database query per update event
  • Trade-off: Convenience vs. slight latency increase
  • Acceptable for most Queue Manager use cases (low-frequency events)

Match Conditions (Pipeline Filtering)โ€‹

matchConditions = [
{
$match: {
// Filter criteria
},
},
];

Match Condition Examples:

Watch Specific Operation Typesโ€‹

[
{
$match: {
operationType: { $in: ['insert', 'update'] },
},
},
];

Watch Specific Field Changesโ€‹

[
{
$match: {
$or: [
{ operationType: 'insert' },
{ 'updateDescription.updatedFields.status': { $exists: true } },
],
},
},
];

Watch Specific Documentsโ€‹

[
{
$match: {
'fullDocument.account_id': ObjectId('account123'),
},
},
];

Change Event Handlerโ€‹

changeStream.on('change', change => {
handleChange(change);
});

Change Event Structure:

{
_id: {
_data: 'resumeToken' // For resuming stream after disconnect
},
operationType: 'insert' | 'update' | 'replace' | 'delete',
clusterTime: Timestamp,
fullDocument: { // With 'updateLookup' option
// Complete document
},
ns: {
db: 'dashclicks',
coll: 'queues'
},
documentKey: {
_id: ObjectId
},
updateDescription: { // Only for 'update' operations
updatedFields: { /* changed fields */ },
removedFields: [ /* removed field names */ ]
}
}

Error Handlerโ€‹

changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
});

Common Errors:

  1. Connection Errors:
{
name: 'MongoNetworkError',
message: 'connection closed'
}
  1. Invalid Resume Token (after long disconnect):
{
name: 'MongoError',
code: 286,
message: 'Resume token not found'
}
  1. Unauthorized (missing changeStream privileges):
{
name: 'MongoError',
code: 13,
message: 'not authorized on database to execute command'
}

Error Handling Strategy:

  • Logs error with QM/change-stream initiator
  • Does not attempt reconnection (stream ends)
  • Relies on Queue Manager restart to re-establish stream

๐Ÿ”„ Complete Flow Diagramโ€‹

sequenceDiagram
participant APP as Application
participant UTIL as changeStream.startChangeStream()
participant MONGO as MongoDB Change Stream
participant HANDLER as handleChange()
participant LOG as Logger

APP->>UTIL: startChangeStream(Model, filters, handler)
UTIL->>MONGO: model.watch(filters, {fullDocument: 'updateLookup'})
MONGO-->>UTIL: Change Stream instance

loop Document Changes
MONGO->>MONGO: Document updated in collection
MONGO->>MONGO: Apply match filters
alt Filter matches
MONGO->>MONGO: Lookup full document
MONGO->>UTIL: Emit 'change' event
UTIL->>HANDLER: handleChange(changeEvent)
HANDLER->>HANDLER: Process change
else Filter doesn't match
MONGO->>MONGO: Skip event
end
end

alt Error occurs
MONGO->>UTIL: Emit 'error' event
UTIL->>LOG: Log error with 'QM/change-stream'
MONGO->>MONGO: Close stream
end

๐ŸŽจ Usage Patternsโ€‹

Pattern 1: Watch Queue Status Changesโ€‹

const { startChangeStream } = require('./common/changeStream');
const Queue = require('./models/queues');

// Start watching for completed queues
startChangeStream(
Queue,
[
{
$match: {
$or: [
{ operationType: 'insert' },
{ 'updateDescription.updatedFields.status': { $exists: true } },
],
},
},
],
async change => {
const queue = change.fullDocument;

if (queue.status === 'completed') {
console.log(`Queue ${queue._id} completed`);
// Send notification, update related records, etc.
}
},
);

Pattern 2: Watch Specific Account Changesโ€‹

const InstaReportsQueue = require('./models/instareports-queue');

// Watch for report builds for specific account
const accountId = new ObjectId('account123');

startChangeStream(
InstaReportsQueue,
[
{
$match: {
'fullDocument.account_id': accountId,
operationType: { $in: ['insert', 'update'] },
},
},
],
async change => {
const report = change.fullDocument;

if (change.operationType === 'insert') {
console.log(`New report queued: ${report._id}`);
// Trigger immediate processing
} else if (change.operationType === 'update') {
console.log(`Report updated: ${report._id}`);
// Update cache, notify frontend
}
},
);

Pattern 3: Multiple Stream Coordinationโ€‹

// Watch multiple collections with consistent handling
const models = [
{ model: InstaReportsQueue, name: 'InstReports' },
{ model: InstasSitesQueue, name: 'InstaSites' },
{ model: ContactQueue, name: 'Contacts' },
];

models.forEach(({ model, name }) => {
startChangeStream(model, [{ $match: { operationType: 'insert' } }], async change => {
console.log(`New ${name} job:`, change.documentKey._id);
// Unified job processing
await processJob(name, change.fullDocument);
});
});

๐Ÿ“Š Real-World Examples from Queue Managerโ€‹

Example 1: Store Downgrade Stream (Hypothetical)โ€‹

const { startChangeStream } = require('../common/changeStream');
const StoreSubscription = require('../models/store-subscription');

// Watch for tier downgrades
startChangeStream(
StoreSubscription,
[
{
$match: {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.needs_downgrade': true },
],
},
},
],
async change => {
const subscription = change.fullDocument;

console.log(`Downgrade needed for ${subscription.account_id}`);

// Trigger immediate downgrade processing
await addToDowngradeQueue(subscription);
},
);

Example 2: InstReports Build Triggerโ€‹

const { startChangeStream } = require('../common/changeStream');
const Site = require('../models/site');

// Watch for sites requesting report updates
startChangeStream(
Site,
[
{
$match: {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.needsReportUpdate': true },
],
},
},
],
async change => {
const site = change.fullDocument;

if (site.needsReportUpdate) {
console.log(`Report update requested for site ${site._id}`);

// Create InstReportsQueue entry
await InstaReportsQueue.create({
site_id: site._id,
account_id: site.account_id,
status: 'pending',
});

// Reset flag
await Site.updateOne({ _id: site._id }, { $set: { needsReportUpdate: false } });
}
},
);

Example 3: Billing Stream (Stripe Key Initialization)โ€‹

const { startChangeStream } = require('../common/changeStream');
const StripeKey = require('../models/stripe-key');

// Watch for new Stripe key connections
startChangeStream(
StripeKey,
[
{
$match: {
operationType: 'insert',
},
},
],
async change => {
const stripeKey = change.fullDocument;

console.log(`New Stripe key for account ${stripeKey.account_id}`);

// Start billing data initialization
await Queue.create({
source: 'billing-initialization',
account_id: stripeKey.account_id,
user_id: stripeKey.created_by,
});
},
);

โš™๏ธ Configurationโ€‹

MongoDB Requirementsโ€‹

Change Streams require:

  • MongoDB 3.6+ (replica set or sharded cluster)
  • Cannot use with standalone MongoDB server

Replica Set Setup:

# Docker Compose
services:
mongodb:
image: mongo:5.0
command: mongod --replSet rs0

# Initialize replica set
docker exec mongodb mongo --eval "rs.initiate()"

Required Permissionsโ€‹

// MongoDB user must have changeStream privileges
{
role: 'readWrite',
db: 'dashclicks',
privileges: [
{
resource: { db: 'dashclicks', collection: '' },
actions: ['changeStream']
}
]
}

Environment Configurationโ€‹

No specific environment variables required, but connection string must support replica sets:

// .env
MONGODB_URI=mongodb://host1:27017,host2:27017,host3:27017/dashclicks?replicaSet=rs0

๐Ÿ“ˆ Performance Considerationsโ€‹

Resource Usageโ€‹

Memory:

  • Change stream uses minimal memory (~1-2MB per stream)
  • Holds resume token and current cursor position
  • No document caching (documents processed immediately)

Network:

  • Only transmits changed documents (not entire collection)
  • With filters, only matching events transmitted
  • fullDocument: 'updateLookup' adds one additional query per update

Database:

  • Change streams use MongoDB oplog
  • Oplog size determines how far back stream can resume
  • Default oplog: ~5% of disk space (configurable)

Scaling Considerationsโ€‹

Multiple Instances:

// Each Queue Manager instance watches same collections
// MongoDB broadcasts changes to ALL watching clients
// Handlers must be idempotent (may process same event multiple times)

// Solution: Use distributed locking
const handleChange = async change => {
const lock = await redlock.lock(`process-${change.documentKey._id}`, 5000);

try {
// Process change
await doWork(change.fullDocument);
} finally {
await lock.unlock();
}
};

Filtering Performance:

// Efficient: Filter at stream level (less network traffic)
[
{
$match: {
'fullDocument.status': 'pending',
},
},
];

// Inefficient: Filter in handler (receives all events)
startChangeStream(Model, [], change => {
if (change.fullDocument.status === 'pending') {
// Process
}
});

Resume Token Strategyโ€‹

// Current implementation does NOT persist resume tokens
// If Queue Manager restarts, stream starts from current time
// May miss events during downtime

// Improved: Persist resume token
let lastResumeToken;

const changeStream = model.watch(matchConditions, {
fullDocument: 'updateLookup',
resumeAfter: lastResumeToken, // Resume from last position
});

changeStream.on('change', async change => {
await handleChange(change);
lastResumeToken = change._id; // Save for next restart
await saveResumeToken(modelName, lastResumeToken);
});

๐Ÿšจ Error Handling & Recoveryโ€‹

Current Error Handlingโ€‹

changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
// Stream ends - no reconnection
});

Limitations:

  • Stream closes on error
  • No automatic reconnection
  • Requires Queue Manager restart

Improved Error Handlingโ€‹

const startChangeStream = (model, matchConditions, handleChange, options = {}) => {
const { maxRetries = 3, retryDelay = 5000, resumeToken = null } = options;

let retryCount = 0;

const start = (resumeAfter = resumeToken) => {
const changeStream = model.watch(matchConditions, {
fullDocument: 'updateLookup',
resumeAfter,
});

let lastToken = resumeAfter;

changeStream.on('change', async change => {
try {
await handleChange(change);
lastToken = change._id;
retryCount = 0; // Reset on success
} catch (error) {
logger.error({
initiator: 'QM/change-stream-handler',
error,
documentKey: change.documentKey,
});
}
});

changeStream.on('error', err => {
logger.error({
initiator: 'QM/change-stream',
error: err,
retryCount,
});

if (retryCount < maxRetries) {
retryCount++;
setTimeout(() => {
logger.info({
initiator: 'QM/change-stream',
message: 'Reconnecting...',
attempt: retryCount,
});
start(lastToken);
}, retryDelay * retryCount); // Exponential backoff
} else {
logger.fatal({
initiator: 'QM/change-stream',
message: 'Max retries reached, stream terminated',
});
}
});
};

start();
};

๐Ÿงช Testing Considerationsโ€‹

Mocking Change Streamsโ€‹

// Mock Mongoose model.watch()
const mockChangeStream = {
on: jest.fn((event, handler) => {
mockChangeStream.handlers = mockChangeStream.handlers || {};
mockChangeStream.handlers[event] = handler;
}),
emit: function (event, data) {
if (this.handlers && this.handlers[event]) {
this.handlers[event](data);
}
},
};

const Model = {
watch: jest.fn().mockReturnValue(mockChangeStream),
};

Test Casesโ€‹

describe('startChangeStream', () => {
test('Calls handleChange on document insert', () => {
const handleChange = jest.fn();

startChangeStream(Model, [], handleChange);

const changeEvent = {
operationType: 'insert',
fullDocument: { _id: 'doc123', name: 'Test' },
};

mockChangeStream.emit('change', changeEvent);

expect(handleChange).toHaveBeenCalledWith(changeEvent);
});

test('Logs errors on stream error', () => {
const logger = require('../utilities').logger;
jest.spyOn(logger, 'error');

startChangeStream(Model, [], jest.fn());

const error = new Error('Connection lost');
mockChangeStream.emit('error', error);

expect(logger.error).toHaveBeenCalledWith({
initiator: 'QM/change-stream',
error,
});
});

test('Applies match conditions to watch', () => {
const matchConditions = [{ $match: { operationType: 'insert' } }];

startChangeStream(Model, matchConditions, jest.fn());

expect(Model.watch).toHaveBeenCalledWith(matchConditions, { fullDocument: 'updateLookup' });
});
});

๐Ÿ“ Notesโ€‹

Change Streams vs Pollingโ€‹

Change Streams (Current):

  • โœ… Real-time (instant notification)
  • โœ… Low latency (milliseconds)
  • โœ… Efficient (no wasted queries)
  • โŒ Requires replica set
  • โŒ More complex setup

Polling (Alternative):

setInterval(async () => {
const pending = await Queue.find({ status: 'pending' });
pending.forEach(processQueue);
}, 5000); // Check every 5 seconds
  • โœ… Works with standalone MongoDB
  • โœ… Simple setup
  • โŒ Higher latency (5-second delay)
  • โŒ Inefficient (queries even when no changes)

When to Use Change Streamsโ€‹

Good Use Cases:

  • Immediate response to database changes
  • Low-frequency events (< 100/sec)
  • Event-driven architecture
  • Cross-service coordination

Not Ideal For:

  • High-frequency updates (> 1000/sec)
  • Batch processing (better to query in bulk)
  • Standalone MongoDB deployments

Alternative: Triggers in Application Codeโ€‹

// Instead of watching Site collection for needsReportUpdate
// Trigger directly in application code

// In Internal API
async function requestReportUpdate(siteId) {
await Site.updateOne({ _id: siteId }, { $set: { needsReportUpdate: true } });

// Direct trigger (no change stream needed)
await InstaReportsQueue.create({
site_id: siteId,
status: 'pending',
});
}

Trade-offs:

  • โœ… Simpler (no change stream setup)
  • โœ… More reliable (no missed events)
  • โŒ Tight coupling (Internal API knows about Queue Manager)
  • โŒ Requires code in multiple places

Complexity: Medium
Business Impact: High - Enables real-time processing
Dependencies: MongoDB replica set, logger utility
Used By: Stream processors, trigger handlers
Last Updated: 2025-01-10

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM