๐ Change Stream Utility
๐ Overviewโ
The changeStream.js utility provides a simplified wrapper around MongoDB's Change Streams API. It enables real-time monitoring of document changes with automatic error handling, fullDocument lookups, and consistent event handling patterns. Used by Queue Manager's stream processors for immediate response to database changes.
Source File: queue-manager/common/changeStream.js
๐ฏ Purposeโ
- Real-time Monitoring: Watch for document insert/update/delete operations
- Filtered Watching: Apply match conditions to watch specific changes
- Full Document Retrieval: Automatically fetch complete updated documents
- Error Handling: Consistent error logging for stream failures
- Event Processing: Route change events to custom handlers
๐ Function Signatureโ
startChangeStream(model, matchConditions, handleChange)โ
const startChangeStream = (model, matchConditions, handleChange) => {
// Implementation
};
Parametersโ
- model (
Mongoose.Model) - Mongoose model to watch - matchConditions (
Array<Object>) - MongoDB aggregation pipeline match stages - handleChange (
Function) - Callback to handle change events- Receives:
change(MongoDB change event object) - Return:
voidorPromise<void>
- Receives:
Returnsโ
void- Starts the change stream (non-blocking)
๐ง Implementation Detailsโ
Complete Source Codeโ
const { logger } = require('../utilities');
/**
* Starts a MongoDB change stream with error handling
* @param {Mongoose.Model} model - Mongoose model to watch
* @param {Array} matchConditions - Match conditions for filtering changes
* @param {Function} handleChange - Function to handle each change event
*/
const startChangeStream = (model, matchConditions, handleChange) => {
const changeStream = model.watch(matchConditions, { fullDocument: 'updateLookup' });
changeStream.on('change', change => {
handleChange(change);
});
changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
});
};
module.exports = {
startChangeStream,
};
๐ Detailed Component Breakdownโ
MongoDB Change Stream Creationโ
const changeStream = model.watch(matchConditions, { fullDocument: 'updateLookup' });
Components:
- model.watch(): Mongoose wrapper for MongoDB's Change Streams API
- matchConditions: Aggregation pipeline to filter events
- fullDocument: 'updateLookup': Ensures complete document is returned
fullDocument: 'updateLookup' Behaviorโ
| Operation Type | Default Behavior | With 'updateLookup' |
|---|---|---|
insert | Returns full document | Returns full document |
update | Returns only delta (changed fields) | Performs additional query to return full document |
replace | Returns full document | Returns full document |
delete | No document | No document |
Why 'updateLookup'?
// Without 'updateLookup'
{
operationType: 'update',
updateDescription: {
updatedFields: { status: 'completed' }
}
// Only delta - missing other fields!
}
// With 'updateLookup'
{
operationType: 'update',
fullDocument: {
_id: ObjectId,
status: 'completed', // Updated field
account_id: ObjectId, // All other fields included
user_id: ObjectId,
createdAt: Date
}
}
Performance Cost:
- Additional database query per update event
- Trade-off: Convenience vs. slight latency increase
- Acceptable for most Queue Manager use cases (low-frequency events)
Match Conditions (Pipeline Filtering)โ
matchConditions = [
{
$match: {
// Filter criteria
},
},
];
Match Condition Examples:
Watch Specific Operation Typesโ
[
{
$match: {
operationType: { $in: ['insert', 'update'] },
},
},
];
Watch Specific Field Changesโ
[
{
$match: {
$or: [
{ operationType: 'insert' },
{ 'updateDescription.updatedFields.status': { $exists: true } },
],
},
},
];
Watch Specific Documentsโ
[
{
$match: {
'fullDocument.account_id': ObjectId('account123'),
},
},
];
Change Event Handlerโ
changeStream.on('change', change => {
handleChange(change);
});
Change Event Structure:
{
_id: {
_data: 'resumeToken' // For resuming stream after disconnect
},
operationType: 'insert' | 'update' | 'replace' | 'delete',
clusterTime: Timestamp,
fullDocument: { // With 'updateLookup' option
// Complete document
},
ns: {
db: 'dashclicks',
coll: 'queues'
},
documentKey: {
_id: ObjectId
},
updateDescription: { // Only for 'update' operations
updatedFields: { /* changed fields */ },
removedFields: [ /* removed field names */ ]
}
}
Error Handlerโ
changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
});
Common Errors:
- Connection Errors:
{
name: 'MongoNetworkError',
message: 'connection closed'
}
- Invalid Resume Token (after long disconnect):
{
name: 'MongoError',
code: 286,
message: 'Resume token not found'
}
- Unauthorized (missing changeStream privileges):
{
name: 'MongoError',
code: 13,
message: 'not authorized on database to execute command'
}
Error Handling Strategy:
- Logs error with
QM/change-streaminitiator - Does not attempt reconnection (stream ends)
- Relies on Queue Manager restart to re-establish stream
๐ Complete Flow Diagramโ
sequenceDiagram
participant APP as Application
participant UTIL as changeStream.startChangeStream()
participant MONGO as MongoDB Change Stream
participant HANDLER as handleChange()
participant LOG as Logger
APP->>UTIL: startChangeStream(Model, filters, handler)
UTIL->>MONGO: model.watch(filters, {fullDocument: 'updateLookup'})
MONGO-->>UTIL: Change Stream instance
loop Document Changes
MONGO->>MONGO: Document updated in collection
MONGO->>MONGO: Apply match filters
alt Filter matches
MONGO->>MONGO: Lookup full document
MONGO->>UTIL: Emit 'change' event
UTIL->>HANDLER: handleChange(changeEvent)
HANDLER->>HANDLER: Process change
else Filter doesn't match
MONGO->>MONGO: Skip event
end
end
alt Error occurs
MONGO->>UTIL: Emit 'error' event
UTIL->>LOG: Log error with 'QM/change-stream'
MONGO->>MONGO: Close stream
end
๐จ Usage Patternsโ
Pattern 1: Watch Queue Status Changesโ
const { startChangeStream } = require('./common/changeStream');
const Queue = require('./models/queues');
// Start watching for completed queues
startChangeStream(
Queue,
[
{
$match: {
$or: [
{ operationType: 'insert' },
{ 'updateDescription.updatedFields.status': { $exists: true } },
],
},
},
],
async change => {
const queue = change.fullDocument;
if (queue.status === 'completed') {
console.log(`Queue ${queue._id} completed`);
// Send notification, update related records, etc.
}
},
);
Pattern 2: Watch Specific Account Changesโ
const InstaReportsQueue = require('./models/instareports-queue');
// Watch for report builds for specific account
const accountId = new ObjectId('account123');
startChangeStream(
InstaReportsQueue,
[
{
$match: {
'fullDocument.account_id': accountId,
operationType: { $in: ['insert', 'update'] },
},
},
],
async change => {
const report = change.fullDocument;
if (change.operationType === 'insert') {
console.log(`New report queued: ${report._id}`);
// Trigger immediate processing
} else if (change.operationType === 'update') {
console.log(`Report updated: ${report._id}`);
// Update cache, notify frontend
}
},
);
Pattern 3: Multiple Stream Coordinationโ
// Watch multiple collections with consistent handling
const models = [
{ model: InstaReportsQueue, name: 'InstReports' },
{ model: InstasSitesQueue, name: 'InstaSites' },
{ model: ContactQueue, name: 'Contacts' },
];
models.forEach(({ model, name }) => {
startChangeStream(model, [{ $match: { operationType: 'insert' } }], async change => {
console.log(`New ${name} job:`, change.documentKey._id);
// Unified job processing
await processJob(name, change.fullDocument);
});
});
๐ Real-World Examples from Queue Managerโ
Example 1: Store Downgrade Stream (Hypothetical)โ
const { startChangeStream } = require('../common/changeStream');
const StoreSubscription = require('../models/store-subscription');
// Watch for tier downgrades
startChangeStream(
StoreSubscription,
[
{
$match: {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.needs_downgrade': true },
],
},
},
],
async change => {
const subscription = change.fullDocument;
console.log(`Downgrade needed for ${subscription.account_id}`);
// Trigger immediate downgrade processing
await addToDowngradeQueue(subscription);
},
);
Example 2: InstReports Build Triggerโ
const { startChangeStream } = require('../common/changeStream');
const Site = require('../models/site');
// Watch for sites requesting report updates
startChangeStream(
Site,
[
{
$match: {
$and: [
{ operationType: 'update' },
{ 'updateDescription.updatedFields.needsReportUpdate': true },
],
},
},
],
async change => {
const site = change.fullDocument;
if (site.needsReportUpdate) {
console.log(`Report update requested for site ${site._id}`);
// Create InstReportsQueue entry
await InstaReportsQueue.create({
site_id: site._id,
account_id: site.account_id,
status: 'pending',
});
// Reset flag
await Site.updateOne({ _id: site._id }, { $set: { needsReportUpdate: false } });
}
},
);
Example 3: Billing Stream (Stripe Key Initialization)โ
const { startChangeStream } = require('../common/changeStream');
const StripeKey = require('../models/stripe-key');
// Watch for new Stripe key connections
startChangeStream(
StripeKey,
[
{
$match: {
operationType: 'insert',
},
},
],
async change => {
const stripeKey = change.fullDocument;
console.log(`New Stripe key for account ${stripeKey.account_id}`);
// Start billing data initialization
await Queue.create({
source: 'billing-initialization',
account_id: stripeKey.account_id,
user_id: stripeKey.created_by,
});
},
);
โ๏ธ Configurationโ
MongoDB Requirementsโ
Change Streams require:
- MongoDB 3.6+ (replica set or sharded cluster)
- Cannot use with standalone MongoDB server
Replica Set Setup:
# Docker Compose
services:
mongodb:
image: mongo:5.0
command: mongod --replSet rs0
# Initialize replica set
docker exec mongodb mongo --eval "rs.initiate()"
Required Permissionsโ
// MongoDB user must have changeStream privileges
{
role: 'readWrite',
db: 'dashclicks',
privileges: [
{
resource: { db: 'dashclicks', collection: '' },
actions: ['changeStream']
}
]
}
Environment Configurationโ
No specific environment variables required, but connection string must support replica sets:
// .env
MONGODB_URI=mongodb://host1:27017,host2:27017,host3:27017/dashclicks?replicaSet=rs0
๐ Performance Considerationsโ
Resource Usageโ
Memory:
- Change stream uses minimal memory (~1-2MB per stream)
- Holds resume token and current cursor position
- No document caching (documents processed immediately)
Network:
- Only transmits changed documents (not entire collection)
- With filters, only matching events transmitted
fullDocument: 'updateLookup'adds one additional query per update
Database:
- Change streams use MongoDB oplog
- Oplog size determines how far back stream can resume
- Default oplog: ~5% of disk space (configurable)
Scaling Considerationsโ
Multiple Instances:
// Each Queue Manager instance watches same collections
// MongoDB broadcasts changes to ALL watching clients
// Handlers must be idempotent (may process same event multiple times)
// Solution: Use distributed locking
const handleChange = async change => {
const lock = await redlock.lock(`process-${change.documentKey._id}`, 5000);
try {
// Process change
await doWork(change.fullDocument);
} finally {
await lock.unlock();
}
};
Filtering Performance:
// Efficient: Filter at stream level (less network traffic)
[
{
$match: {
'fullDocument.status': 'pending',
},
},
];
// Inefficient: Filter in handler (receives all events)
startChangeStream(Model, [], change => {
if (change.fullDocument.status === 'pending') {
// Process
}
});
Resume Token Strategyโ
// Current implementation does NOT persist resume tokens
// If Queue Manager restarts, stream starts from current time
// May miss events during downtime
// Improved: Persist resume token
let lastResumeToken;
const changeStream = model.watch(matchConditions, {
fullDocument: 'updateLookup',
resumeAfter: lastResumeToken, // Resume from last position
});
changeStream.on('change', async change => {
await handleChange(change);
lastResumeToken = change._id; // Save for next restart
await saveResumeToken(modelName, lastResumeToken);
});
๐จ Error Handling & Recoveryโ
Current Error Handlingโ
changeStream.on('error', err => {
logger.error({ initiator: 'QM/change-stream', error: err });
// Stream ends - no reconnection
});
Limitations:
- Stream closes on error
- No automatic reconnection
- Requires Queue Manager restart
Improved Error Handlingโ
const startChangeStream = (model, matchConditions, handleChange, options = {}) => {
const { maxRetries = 3, retryDelay = 5000, resumeToken = null } = options;
let retryCount = 0;
const start = (resumeAfter = resumeToken) => {
const changeStream = model.watch(matchConditions, {
fullDocument: 'updateLookup',
resumeAfter,
});
let lastToken = resumeAfter;
changeStream.on('change', async change => {
try {
await handleChange(change);
lastToken = change._id;
retryCount = 0; // Reset on success
} catch (error) {
logger.error({
initiator: 'QM/change-stream-handler',
error,
documentKey: change.documentKey,
});
}
});
changeStream.on('error', err => {
logger.error({
initiator: 'QM/change-stream',
error: err,
retryCount,
});
if (retryCount < maxRetries) {
retryCount++;
setTimeout(() => {
logger.info({
initiator: 'QM/change-stream',
message: 'Reconnecting...',
attempt: retryCount,
});
start(lastToken);
}, retryDelay * retryCount); // Exponential backoff
} else {
logger.fatal({
initiator: 'QM/change-stream',
message: 'Max retries reached, stream terminated',
});
}
});
};
start();
};
๐งช Testing Considerationsโ
Mocking Change Streamsโ
// Mock Mongoose model.watch()
const mockChangeStream = {
on: jest.fn((event, handler) => {
mockChangeStream.handlers = mockChangeStream.handlers || {};
mockChangeStream.handlers[event] = handler;
}),
emit: function (event, data) {
if (this.handlers && this.handlers[event]) {
this.handlers[event](data);
}
},
};
const Model = {
watch: jest.fn().mockReturnValue(mockChangeStream),
};
Test Casesโ
describe('startChangeStream', () => {
test('Calls handleChange on document insert', () => {
const handleChange = jest.fn();
startChangeStream(Model, [], handleChange);
const changeEvent = {
operationType: 'insert',
fullDocument: { _id: 'doc123', name: 'Test' },
};
mockChangeStream.emit('change', changeEvent);
expect(handleChange).toHaveBeenCalledWith(changeEvent);
});
test('Logs errors on stream error', () => {
const logger = require('../utilities').logger;
jest.spyOn(logger, 'error');
startChangeStream(Model, [], jest.fn());
const error = new Error('Connection lost');
mockChangeStream.emit('error', error);
expect(logger.error).toHaveBeenCalledWith({
initiator: 'QM/change-stream',
error,
});
});
test('Applies match conditions to watch', () => {
const matchConditions = [{ $match: { operationType: 'insert' } }];
startChangeStream(Model, matchConditions, jest.fn());
expect(Model.watch).toHaveBeenCalledWith(matchConditions, { fullDocument: 'updateLookup' });
});
});
๐ Related Documentationโ
- Common Utilities Overview
- Stream Processors (link removed - file does not exist) - Real-world usage of change streams
- Trigger Handlers - Alternative to change streams for some use cases
- MongoDB Change Streams Official Docs
๐ Notesโ
Change Streams vs Pollingโ
Change Streams (Current):
- โ Real-time (instant notification)
- โ Low latency (milliseconds)
- โ Efficient (no wasted queries)
- โ Requires replica set
- โ More complex setup
Polling (Alternative):
setInterval(async () => {
const pending = await Queue.find({ status: 'pending' });
pending.forEach(processQueue);
}, 5000); // Check every 5 seconds
- โ Works with standalone MongoDB
- โ Simple setup
- โ Higher latency (5-second delay)
- โ Inefficient (queries even when no changes)
When to Use Change Streamsโ
Good Use Cases:
- Immediate response to database changes
- Low-frequency events (< 100/sec)
- Event-driven architecture
- Cross-service coordination
Not Ideal For:
- High-frequency updates (> 1000/sec)
- Batch processing (better to query in bulk)
- Standalone MongoDB deployments
Alternative: Triggers in Application Codeโ
// Instead of watching Site collection for needsReportUpdate
// Trigger directly in application code
// In Internal API
async function requestReportUpdate(siteId) {
await Site.updateOne({ _id: siteId }, { $set: { needsReportUpdate: true } });
// Direct trigger (no change stream needed)
await InstaReportsQueue.create({
site_id: siteId,
status: 'pending',
});
}
Trade-offs:
- โ Simpler (no change stream setup)
- โ More reliable (no missed events)
- โ Tight coupling (Internal API knows about Queue Manager)
- โ Requires code in multiple places
Complexity: Medium
Business Impact: High - Enables real-time processing
Dependencies: MongoDB replica set, logger utility
Used By: Stream processors, trigger handlers
Last Updated: 2025-01-10