๐ Funnel Clone
๐ Overviewโ
The Funnel Clone job implements a sophisticated two-tier cloning system that duplicates complete funnels including all steps, page components, images, forms, and chat plugins. It runs every 5 minutes, identifies funnels flagged for cloning, creates placeholder steps, then queues each step for detailed processing including asset duplication and ID replacement. The system supports resumable cloning with stale job recovery and real-time progress notifications via Socket.IO.
Complete Flow:
- Cron Initialization:
queue-manager/crons/funnels/funnelClone.js - Service Processing:
queue-manager/services/funnels/funnelClone.js - Funnel Queue:
queue-manager/queues/funnels/funnelClone.js - Step Queue:
queue-manager/queues/funnels/funnelStepClone.js
Execution Pattern: Frequent polling (every 5 minutes) with two-level queue hierarchy
Queue Names: funnels_clone (parent), funnels_clone_step (child)
Environment Flag: QM_FUNNELS_CLONE=true (in index.js)
๐ Complete Processing Flowโ
sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5 min)
participant SERVICE as Clone Service
participant FUNNEL_DB as Funnels DB
participant FUNNEL_QUEUE as Funnel Clone<br/>Queue
participant STEP_DB as Funnel Steps DB
participant STEP_QUEUE as Step Clone<br/>Queue
participant WASABI as Wasabi S3
participant SOCKET as Socket.IO<br/>Service
CRON->>SERVICE: funnelClone()
SERVICE->>SERVICE: Calculate stale threshold:<br/>10 minutes ago
SERVICE->>FUNNEL_DB: Aggregate query:<br/>- pending = true AND in_progress โ true<br/>- OR in_progress + updatedAt < 10 min<br/>- Lookup accounts<br/>- Lookup steps<br/>- Check all steps completed<br/>- Limit 3 funnels
FUNNEL_DB-->>SERVICE: Pending funnels (max 3)
loop Each pending funnel
alt All steps already completed
SERVICE->>FUNNEL_DB: Clear flags:<br/>pending=false, in_progress=false
SERVICE->>FUNNEL_DB: Find active users<br/>for socket emit
SERVICE->>SOCKET: Emit 'funnel_cloning_completed'<br/>{funnel_id, status:'completed'}
else Steps incomplete
SERVICE->>FUNNEL_DB: Set in_progress=true
SERVICE->>FUNNEL_QUEUE: Add job: {newFunnel}
end
end
loop Each funnel job
FUNNEL_QUEUE->>FUNNEL_QUEUE: Fetch original funnel data
alt Original funnel not found
FUNNEL_QUEUE->>FUNNEL_DB: Set error + flags:<br/>original_funnel_deleted=true
else Original funnel found
FUNNEL_QUEUE->>STEP_DB: Find original steps
loop Each original step
alt Step already exists (stale recovery)
FUNNEL_QUEUE->>STEP_QUEUE: Re-queue existing step
else New step needed
FUNNEL_QUEUE->>STEP_DB: Create step placeholder:<br/>raw_html=null, pending=true
FUNNEL_QUEUE->>STEP_DB: Create components placeholder:<br/>components='{}'
FUNNEL_QUEUE->>FUNNEL_DB: Add step._id to funnel.steps[]
FUNNEL_QUEUE->>STEP_QUEUE: Queue step for processing
end
end
end
end
loop Each step job (concurrency: 5)
STEP_QUEUE->>STEP_DB: Set in_progress=true
STEP_QUEUE->>STEP_DB: Fetch original step + components
STEP_QUEUE->>STEP_QUEUE: Replace IDs:<br/>originalFunnelId โ newFunnelId<br/>originalStepId โ newStepId
alt Images not processed
STEP_QUEUE->>STEP_DB: Find existing cloned images
STEP_QUEUE->>WASABI: Copy images from original funnel<br/>(batch Promise.all)
STEP_QUEUE->>STEP_DB: Create SitesImages records
STEP_QUEUE->>STEP_QUEUE: Replace image keys in HTML/components
end
alt Forms not processed
STEP_QUEUE->>STEP_QUEUE: Extract form IDs from components<br/>(regex: /userform/([^?/]+))
loop Each form ID
STEP_QUEUE->>STEP_DB: Find original form
STEP_QUEUE->>STEP_DB: Create new form with new UUID
STEP_QUEUE->>STEP_QUEUE: Replace form IDs in HTML/components
end
end
alt Chat plugin not processed
STEP_QUEUE->>STEP_DB: Check if live_chat_widget enabled
STEP_QUEUE->>STEP_DB: Fetch support token for preview domain
STEP_QUEUE->>STEP_QUEUE: Replace chat plugin API key in HTML
end
STEP_QUEUE->>STEP_DB: Update components with replaced values
STEP_QUEUE->>STEP_DB: Update step:<br/>raw_html, pending=false, in_progress=false
STEP_QUEUE->>STEP_QUEUE: Check if all steps completed
alt All steps completed
STEP_QUEUE->>FUNNEL_DB: Clear funnel flags
STEP_QUEUE->>SOCKET: Emit 'funnel_cloning_completed'
end
end
๐ Source Filesโ
1. Cron Initializationโ
File: queue-manager/crons/funnels/funnelClone.js
Purpose: Schedule funnel clone processing every 5 minutes
Cron Pattern: */5 * * * * (every 5 minutes)
Initialization:
const funnelClone = require('../../services/funnels/funnelClone');
const cron = require('node-cron');
const logger = require('../../utilities/logger');
let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * *', async () => {
if (!inProgress) {
inProgress = true;
await funnelClone();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/funnels/funnel-clone', error: err });
}
};
In-Progress Lock: Prevents overlapping executions during heavy cloning operations.
2. Service Processing (THE COMPLEX AGGREGATION LOGIC)โ
File: queue-manager/services/funnels/funnelClone.js
Purpose: Find funnels flagged for cloning, check completion status, queue incomplete funnels
Key Functions:
- Calculate 10-minute stale threshold for stuck jobs
- Complex MongoDB aggregation with lookups and projections
- Check if all funnel steps are completed
- Queue incomplete funnels for processing
- Emit Socket.IO events for completed funnels
Main Service Function (Simplified - full code ~200 lines):
const FunnelClone = require('../../queues/funnels/funnelClone');
const Funnels = require('../../models/funnels');
const User = require('../../models/user');
const logger = require('../../utilities/logger');
const { socketEmit } = require('../../utilities');
module.exports = async () => {
try {
const WAIT_TIME = 10; // minutes
const TEN_MINUTES = WAIT_TIME * 60 * 1000;
const FunnelInProgressReaddQueueTime = new Date(new Date().getTime() - TEN_MINUTES);
// Complex aggregation query
let query = [
{ $sort: { _id: 1 } },
{
$match: {
$or: [
{ $and: [{ pending: true }, { in_progress: { $ne: true } }] },
{
$and: [
{ in_progress: true },
{ updatedAt: { $lte: FunnelInProgressReaddQueueTime } },
],
},
],
},
},
{
$lookup: {
from: '_accounts',
localField: 'account_id',
foreignField: '_id',
as: 'account',
},
},
{ $set: { account: { $first: '$account' } } },
{
$lookup: {
from: 'funnel.step',
localField: '_id',
foreignField: 'funnel_id',
as: 'steps',
},
},
{
$project: {
// Calculate if all steps have pending=false AND in_progress=false
hasNoPendingAndInProgress: {
$map: {
input: '$steps',
as: 'step',
in: {
$cond: [
{
$and: [
{ $eq: ['$$step.pending', false] },
{ $eq: ['$$step.in_progress', false] },
{ $ne: ['$$step', null] },
],
},
true,
false,
],
},
},
},
// ... other fields
},
},
{
$project: {
// Calculate if all steps are processed
allStepsProcessed: {
$cond: {
if: {
$or: [
{ $eq: [{ $size: '$hasNoPendingAndInProgress' }, 0] },
{ $eq: ['$hasNoPendingAndInProgress', []] },
],
},
then: false,
else: {
$cond: {
if: { $eq: [{ $allElementsTrue: '$hasNoPendingAndInProgress' }, true] },
then: true,
else: false,
},
},
},
},
},
},
{ $limit: 3 }, // Process max 3 funnels per run
];
const pendingFunnels = await Funnels.aggregate(query);
if (!pendingFunnels?.length) {
logger.log({ message: 'No funnel to be cloned found ! Returning !!' });
} else {
const funnelClone = await FunnelClone.start();
for (const pendingFunnel of pendingFunnels) {
if (pendingFunnel.allStepsProcessed) {
// All steps completed - finalize
await Funnels.findByIdAndUpdate(pendingFunnel._id, {
pending: false,
in_progress: false,
});
// Emit socket notification
const user = await User.find({ account: pendingFunnel.account_id, active: true });
const userIds = user.map(u => u._id.toString());
await socketEmit('funnel_cloning_completed', userIds, {
funnel_id: pendingFunnel._id,
status: 'completed',
});
} else {
// Queue for processing
const newFunnel = await Funnels.findByIdAndUpdate(
pendingFunnel._id,
{ in_progress: true },
{ new: true },
);
await funnelClone.add({ newFunnel }, { attempts: 3 });
}
}
}
} catch (err) {
logger.error({ message: 'Error occured while cloning funnel.', error: err });
}
};
Note: Full service code is ~230 lines with complex aggregation logic.
3. Funnel Clone Queue (THE STEP CREATION LOGIC)โ
File: queue-manager/queues/funnels/funnelClone.js
Purpose: Create placeholder steps and queue for detailed processing
Key Functions:
- Fetch original funnel and steps
- Handle missing original funnels
- Create new step records with
pending=true - Create placeholder component records
- Queue steps for detailed processing
- Support stale job recovery (re-queue incomplete steps)
Main Processor (Simplified - full code ~270 lines):
const processCb = async (job, done) => {
try {
let { newFunnel } = job.data;
const originalFunnelId = newFunnel.original_funnel_id;
// Fetch original funnel
const funnel = await Funnels.findOne({ _id: originalFunnelId }).lean().exec();
if (!funnel) {
// Original funnel deleted - mark error
await Funnels.updateOne(
{ _id: newFunnel._id },
{
$addToSet: { error: 'Original Funnel not found, id:' + originalFunnelId },
pending: false,
in_progress: false,
original_funnel_deleted: true,
},
);
done();
return;
}
// Fetch original steps
const oldSteps = await FunnelSteps.find({ funnel_id: funnel._id }).lean().exec();
const newSteps = [];
const stepQueue = await FunnelStepClone.start();
const WAIT_TIME = 5; // minutes for step stale recovery
const stepInProgressReaddQueueTime = new Date(Date.now() - WAIT_TIME * 60 * 1000);
for (let oldStepData of oldSteps) {
// Check if step already exists (stale recovery)
let isStepAlreadyExist = await FunnelSteps.find({
funnel_id: newFunnel._id,
original_step_id: oldStepData._id,
})
.lean()
.exec();
if (isStepAlreadyExist && isStepAlreadyExist.length > 0) {
// Filter stale steps (pending or in_progress > 5 min)
isStepAlreadyExist = isStepAlreadyExist.filter(step => {
return (
step.pending === true ||
(step.in_progress === true && step.updatedAt <= stepInProgressReaddQueueTime)
);
});
if (isStepAlreadyExist?.[0]) {
// Re-queue stale step
await stepQueue.add(
{
newStepId: isStepAlreadyExist._id,
originalStepId: isStepAlreadyExist.original_step_id,
newFunnelId: isStepAlreadyExist.funnel_id,
originalFunnelId,
account_id: newFunnel.account_id,
parentAccountId: parentAccount,
},
{ attempts: 3 },
);
continue;
}
} else {
// Create new step placeholder
newStep = {
...oldStepData,
funnel_id: newFunnel._id,
original_step_id: oldStepData._id,
raw_html: null,
domain_id: null,
header_tracking_code: null,
footer_tracking_code: null,
version: new Date().getTime(),
pending: true,
processed: {
images_processed: false,
forms_processed: false,
chat_plugin_processed: false,
},
};
delete newStep._id;
newSteps.push(newStep);
}
}
if (newSteps.length > 0) {
// Insert new steps
const savedSteps = await FunnelSteps.insertMany(newSteps);
// Create placeholder components
let newComponents = savedSteps.map(savedStep => ({
account_id: newFunnel.account_id,
funnel_id: newFunnel._id,
step_id: savedStep._id,
components: '{}',
}));
await FunnelStepComponents.insertMany(newComponents);
// Update funnel with step IDs
await Funnels.findByIdAndUpdate(newFunnel._id, {
$addToSet: { steps: savedSteps.map(s => s._id) },
});
// Queue steps for detailed processing
for (const step of savedSteps) {
await stepQueue.add(
{
newStepId: step._id,
originalStepId: step._doc.original_step_id,
newFunnelId: step._doc.funnel_id,
originalFunnelId,
account_id: newFunnel.account_id,
parentAccountId: parentAccount,
},
{ attempts: 3 },
);
}
}
done();
} catch (err) {
done(err);
}
};
4. Step Clone Queue (THE DETAILED ASSET PROCESSING)โ
File: queue-manager/queues/funnels/funnelStepClone.js
Purpose: Clone step content including images, forms, and chat plugins
Key Functions:
- Process images (copy from Wasabi S3, create new records)
- Process forms (duplicate with new UUIDs)
- Process chat plugins (replace API keys)
- Replace all IDs in HTML and components
- Check for all steps completed
- Emit Socket.IO completion events
Main Processor (Simplified - full code ~700 lines):
const processCb = async (job, done) => {
let { newStepId, originalStepId, newFunnelId, originalFunnelId, account_id, parentAccountId } =
job.data;
try {
// Set in_progress flag
await FunnelSteps.findByIdAndUpdate(newStepId, { in_progress: true });
// Fetch original step and components
const originalComponents = await FunnelStepComponents.findOne({ step_id: originalStepId })
.lean()
.exec();
const originalStepInfo = await FunnelSteps.findOne({ _id: originalStepId }).lean().exec();
let originalHtmlContent = originalStepInfo?.raw_html || '';
// Replace funnel and step IDs
let keyValuePairObject = {
[originalFunnelId.toString()]: newFunnelId.toString(),
[originalStepId.toString()]: newStepId.toString(),
};
replaceObjectComponents = await replaceKeysInString(
keyValuePairObject,
originalComponents.components,
);
replaceObjectHtml = await replaceKeysInString(keyValuePairObject, originalHtmlContent);
let processedObj = {};
let errors = [];
// 1. Process Images
if (!newStepInfo?.processed?.images_processed) {
const { processed, error, imageReplaceObj, componentsReplaceObj } = await processImage({
originalFunnelId,
originalComponents,
parentAccountId,
originalHtmlContent,
account_id,
newFunnelId,
newStepId,
});
if (processed) {
processedObj.images_processed = true;
replaceObjectHtml = imageReplaceObj;
replaceObjectComponents = componentsReplaceObj;
}
errors.push(error);
}
// 2. Process Forms
if (!newStepInfo?.processed?.forms_processed) {
const { processed, error, formsReplaceObj, componentsReplaceObj } = await processForms({
originalComponents: replaceObjectComponents,
originalStepId,
originalHtmlContent: replaceObjectHtml,
account_id,
newFunnelId,
});
if (processed) {
processedObj.forms_processed = true;
replaceObjectHtml = formsReplaceObj;
replaceObjectComponents = componentsReplaceObj;
}
errors.push(error);
}
// 3. Process Chat Plugin
if (!newStepInfo?.processed?.chat_plugin_processed) {
const { processed, error, chatPluginReplaceObj } = await replaceChatPlugin({
originalFunnelId,
originalStepId,
newStepId,
originalHtmlContent,
});
if (processed) {
processedObj.chat_plugin_processed = true;
if (chatPluginReplaceObj) replaceObjectHtml = chatPluginReplaceObj;
}
errors.push(error);
}
// Update components and step
await FunnelStepComponents.updateOne(
{ step_id: newStepId },
{ $addToSet: { errors }, components: replaceObjectComponents.components },
);
if (allProcessed) {
await FunnelSteps.findByIdAndUpdate(newStepId, {
in_progress: false,
pending: false,
raw_html: replaceObjectHtml,
processed: processedObj,
});
}
// Check if all steps completed
await checkForAllStepCompleted({ newFunnelId });
done();
} catch (err) {
done(err);
}
};
Helper Functions:
- processImage: Copy images from Wasabi S3, create new records
- processForms: Duplicate forms with new UUIDs
- replaceChatPlugin: Update chat plugin API keys
- checkForAllStepCompleted: Check if all steps done, emit socket event
๐๏ธ Collections Usedโ
funnelsโ
- Operations: Find, Update, Aggregate
- Model:
shared/models/funnels.js - Usage Context: Track funnel cloning status
Key Fields:
pending: Boolean - funnel awaiting cloningin_progress: Boolean - funnel currently being clonedoriginal_funnel_id: Reference to source funneloriginal_funnel_deleted: Boolean - source no longer existssteps: Array of step ObjectIdslive_chat_widget: Boolean - enable chat pluginerror: Array of error messages
funnel.stepโ
- Operations: Find, Insert, Update
- Model:
shared/models/funnel.step.js - Usage Context: Clone funnel pages/steps
Key Fields:
funnel_id: Parent funnel referenceoriginal_step_id: Source step referenceraw_html: Page HTML contentpending: Boolean - step awaiting processingin_progress: Boolean - step currently processingprocessed: Object - track sub-processing statusimages_processed: Booleanforms_processed: Booleanchat_plugin_processed: Boolean
funnel.step.componentsโ
- Operations: Find, Insert, Update
- Model:
shared/models/funnel.step.components.js - Usage Context: Store step component definitions
Key Fields:
step_id: Parent step referencefunnel_id: Parent funnel referenceaccount_id: Account ownercomponents: JSON string - component definitionserrors: Array of processing errors
sites_imagesโ
- Operations: Find, Insert
- Model:
shared/models/sites-images.js - Usage Context: Track cloned images
Key Fields:
ref_id: Funnel ID referenceaccount_id: Account ownerparent_account: Parent account (for sub-accounts)source_key: Original S3 keykey: New S3 key (UUID-based)origin: 'funnels'additionalInfo: Bucket, ContentType, S3 URL
formsโ
- Operations: Find, Insert
- Model:
shared/models/forms.js - Usage Context: Duplicate embedded forms
Key Fields:
form_secret_id: UUID identifieraccount_id: Account ownerform_name: Form name
support_tokensโ
- Operations: Find
- Model:
shared/models/support.tokens.js - Usage Context: Chat plugin API keys
Key Fields:
domain: Preview domaintoken: API key for chat plugin
_accountsโ
- Operations: Lookup (aggregation)
- Model:
shared/models/account.js - Usage Context: Account data for socket emissions
usersโ
- Operations: Find
- Model:
shared/models/user.js - Usage Context: Socket emission targets
๐ง Job Configurationโ
Cron Scheduleโ
'*/5 * * * *'; // Every 5 minutes
Stale Job Thresholdsโ
Funnel Level:
const WAIT_TIME = 10; // minutes
const TEN_MINUTES = WAIT_TIME * 60 * 1000;
const FunnelInProgressReaddQueueTime = new Date(new Date().getTime() - TEN_MINUTES);
Step Level:
const WAIT_TIME = 5; // minutes
const stepInProgressReaddQueueTime = new Date(Date.now() - WAIT_TIME * 60 * 1000);
Queue Settingsโ
Funnel Queue:
QueueWrapper('funnels_clone', 'global', {
processCb,
failedCb,
completedCb,
settings: { maxStalledCount: 10 },
});
Step Queue:
QueueWrapper('funnels_clone_step', 'global', {
processCb,
failedCb,
completedCb,
settings: {
maxStalledCount: 10,
stalledInterval: 60 * 60 * 1000, // 1 hour (default is 30 sec)
},
concurrency: 5, // Process 5 steps in parallel
});
Job Retry Configurationโ
Funnel Jobs: 3 attempts Step Jobs: 3 attempts
Batch Limitsโ
Funnels Per Run: 3 (hardcoded limit) Steps Per Funnel: Unlimited (all steps queued) Concurrent Step Processing: 5
๐ Processing Logic - Detailed Flowโ
1. Service Aggregation Queryโ
Complex MongoDB Aggregation Pipeline:
Stage 1 - Sort: { $sort: { _id: 1 } }
- Process oldest funnels first
Stage 2 - Match: Find pending or stale funnels
{
$match: {
$or: [
{ $and: [{ pending: true }, { in_progress: { $ne: true } }] }, // Pending
{
$and: [
{ in_progress: true },
{ updatedAt: { $lte: FunnelInProgressReaddQueueTime } }, // Stale (>10 min)
],
},
];
}
}
Stage 3-4 - Lookup Account: Join with _accounts collection
Stage 5 - Lookup Steps: Join with funnel.step collection
Stage 6 - Calculate Step Status: Map steps to check if pending=false AND in_progress=false
Stage 7 - Check All Steps Processed:
{
allStepsProcessed: {
$cond: {
if: { /* empty array check */ },
then: false,
else: {
$cond: {
if: { $allElementsTrue: '$hasNoPendingAndInProgress' },
then: true,
else: false
}
}
}
}
}
Stage 8 - Limit: { $limit: 3 } - Process max 3 funnels per run
2. Stale Job Recoveryโ
Funnel Level (10-minute threshold):
- Funnels with
in_progress=truefor >10 minutes - Re-queued for processing
- Prevents stuck funnels from blocking clone queue
Step Level (5-minute threshold):
- Steps with
in_progress=truefor >5 minutes - Re-queued for processing
- Faster recovery for granular step operations
3. Image Processingโ
Steps:
- Find existing cloned images (avoid duplicates):
const existingImages = await SitesImages.find({
$or: [{ ref_id: newFunnelId }, { account_id: account_id }],
});
- Find original funnel images not yet cloned:
const images = await SitesImages.find({
ref_id: originalFunnelId,
fileName: { $nin: existingKeys },
});
- Copy images in Wasabi S3 (batch operation):
const destinationKey = `${uuidv4()}/${image.fileName}`;
await wasabiObj.copyObject(sourceKey, destinationKey, bucketName);
- Create new image records:
const newImage = new SitesImages({
uploaded: true,
fileName: image.fileName,
fileSize: image.fileSize,
account_id: account_id,
parent_account: parentAccountId,
source_key: sourceKey,
origin: 'funnels',
key: destinationKey,
additionalInfo: { ... },
ref_id: newFunnelId
});
- Replace image keys in HTML/components:
keyValuePairObject[sourceKey] = destinationKey;
replacedHtml = await replaceKeysInString(keyValuePairObject, originalHtml);
4. Form Processingโ
Steps:
- Extract form IDs from components using regex:
const regex = /\/userform\/([^?/]+)/g;
const matches = originalComponents?.components?.match(regex);
const extractedParts = [...new Set(matches.map(match => match.split('/userform/')[1]))];
- For each form ID, find original form:
const matchingForm = await Forms.findOne({ form_secret_id: extractedForm });
- Clone form with new UUID:
matchingForm.form_secret_id = uuidv4();
delete matchingForm._id;
let newForm = new Forms(matchingForm);
await newForm.save();
- Replace form IDs in HTML/components:
keyValuePairObject[extractedForm] = newForm.form_secret_id;
5. Chat Plugin Processingโ
Steps:
- Check if live chat enabled:
const isChatPluginEnabled = await Funnels.findOne({
_id: originalFunnelId,
live_chat_widget: true,
});
- Fetch support token for preview domain:
const supportToken = await SupportToken.findOne({ domain: previewDomain });
- Replace API key in HTML:
const apiKeyRegex = /(<script[^>]*?custom-chat-widget="true"[^>]*?api-key=")[^"]*?(")/g;
replacedHtml = originalHtml.replace(apiKeyRegex, `$1${supportToken?.token}$2`);
6. Completion Checkโ
Aggregation Query (runs after each step completion):
const checkforAllStepsCompleted = [
{ $match: { _id: newFunnelId } },
{ $lookup: { from: 'funnel.step', ... } },
{
$project: {
allStepsProcessed: {
$cond: {
if: { $allElementsTrue: '$hasNoPendingAndInProgress' },
then: true,
else: false
}
}
}
}
];
If All Steps Completed:
- Clear funnel flags:
pending=false, in_progress=false - Find active users for account
- Emit Socket.IO event:
'funnel_cloning_completed'
7. Socket.IO Notificationsโ
Event: funnel_cloning_completed
Payload:
{
funnel_id: newFunnelId,
status: 'completed'
}
Recipients: All active users in the account
Triggered:
- When all steps completed (from step queue)
- When funnel already completed (from service)
๐จ Error Handlingโ
Common Error Scenariosโ
Original Funnel Deletedโ
Detection: Original funnel not found in database
Handling:
await Funnels.updateOne(
{ _id: newFunnel._id },
{
$addToSet: { error: 'Original Funnel not found' },
pending: false,
in_progress: false,
original_funnel_deleted: true,
},
);
Image Copy Failureโ
Scenario: Wasabi S3 copy operation fails
Handling: Error logged, accumulated in errors array, processing continues
Impact: Some images may not be cloned, but funnel remains functional
Form Duplication Failureโ
Scenario: Form not found or save fails
Handling: Error logged, processing continues with other forms
Impact: Form links may be broken in cloned funnel
Component Insertion Failureโ
Scenario: MongoDB duplicate key error
Handling: Error thrown, job retries (up to 3 attempts)
Impact: Step creation fails, entire funnel retry needed
Failed Job Callbacksโ
Funnel Level:
const failedCb = async (job, err) => {
const id = job?.data?._id;
if (job.attemptsMade >= job.opts.attempts) {
await Funnels.updateOne({ _id: id }, { pending: true, in_progress: false });
}
};
Step Level:
const failedCb = async (job, err) => {
const id = job.data._id;
if (job.attemptsMade >= job.opts.attempts) {
await FunnelSteps.updateOne({ _id: id }, { pending: true, in_progress: false });
}
};
Note: Failed jobs reset to pending=true for retry on next service run.
๐ Monitoring & Loggingโ
Success Loggingโ
Service Level:
- No funnels found
- Funnel added to queue
- Stalled funnel re-queued
- All steps already completed
Funnel Queue:
- Original funnel not found
- Steps created and queued
- Step re-queued (stale recovery)
Step Queue:
- Image processing completed
- Form processing completed
- Chat plugin replaced
- Step cloning completed
- Socket emitted
Error Loggingโ
Service Level:
- Aggregation query failure
- Socket emit failure
Funnel Queue:
- Missing newFunnel in job data
- Original funnel not found
- Component insertion failure
Step Queue:
- Missing originalComponents
- Image processing errors
- Form processing errors
- Chat plugin errors
Performance Metricsโ
- Service Query Time: 1-3 seconds (complex aggregation)
- Funnel Job Time: 5-30 seconds (depends on step count)
- Step Job Time: 10-60 seconds (depends on image/form count)
- Typical Funnel: 5-10 steps, 20-50 images, 1-3 forms
- Concurrent Step Processing: 5 steps in parallel
๐ Integration Pointsโ
Triggers This Jobโ
- Internal API: Sets
pending=trueon funnel clone request - Manual Trigger: Via API endpoint (if QM_HOOKS=true)
External Dependenciesโ
- Wasabi S3: Image storage and copying
- Socket.IO Service: Real-time completion notifications
- Preview Domain: Chat plugin configuration
Jobs That Depend On Thisโ
- Funnel Publishing: May trigger after successful clone
- Analytics Tracking: Tracks clone operations
Utilities Usedโ
- replaceKeysInString: Replace IDs in HTML/components
- wasabi: S3 operations
- socketEmit: Real-time notifications
โ ๏ธ Important Notesโ
Side Effectsโ
- โ ๏ธ Image Duplication: Creates new S3 objects (storage costs)
- โ ๏ธ Form Duplication: Creates new form records
- โ ๏ธ Database Load: Complex aggregations every 5 minutes
- โ ๏ธ Socket Emissions: Real-time notifications to all account users
Performance Considerationsโ
- Batch Limit: 3 funnels per run prevents overload
- Concurrent Steps: 5 parallel step processing
- Stale Recovery: Prevents stuck jobs from blocking queue
- Image Deduplication: Checks existing images before copying
- Promise.all: Parallel S3 operations for performance
Business Logicโ
Why Two-Tier Queue System?
- Funnel queue creates structure quickly
- Step queue handles detailed processing in parallel
- Enables granular progress tracking
- Supports resumable cloning
Why Stale Job Recovery?
- Long-running operations may timeout
- Server restarts mid-clone
- Network failures during S3 operations
- Prevents permanent stuck state
Why Placeholder Components?
- Ensures step records exist immediately
- Prevents foreign key errors
- Enables incremental processing
- Supports retry logic
Why Socket.IO Notifications?
- Real-time progress updates
- Better UX (no polling needed)
- Immediate notification on completion
- Multiple users notified simultaneously
Maintenance Notesโ
- Stale Thresholds: 10 min (funnel), 5 min (step) hardcoded
- Batch Limit: 3 funnels per run hardcoded
- Step Concurrency: 5 hardcoded
- Image Deduplication: Critical for multi-step funnels
- Form UUID: Ensures uniqueness across clones
Code Quality Issuesโ
Issue 1: Missing Return After Error Log
if (!domain.cf_id) {
logger.error({ message: 'Incorrect domain provided' });
// Missing: done(); return;
}
Issue 2: Complex Aggregation Could Be Optimized
- Consider caching account lookups
- Index on
pending,in_progress,updatedAt
Issue 3: Error Accumulation
- Errors pushed to array but not always checked
- Some errors silently ignored
๐งช Testingโ
Manual Triggerโ
# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/funnels/funnelClone
Simulate Funnel Cloneโ
// Flag funnel for cloning
const originalFunnel = await Funnels.findOne({});
const newFunnel = await Funnels.create({
name: originalFunnel.name + ' (Clone)',
account_id: originalFunnel.account_id,
original_funnel_id: originalFunnel._id,
pending: true,
});
// Wait for processing (5 minutes) or trigger manually
// Verify completion
const clonedFunnel = await Funnels.findById(newFunnel._id);
console.log('Cloning complete:', !clonedFunnel.pending && !clonedFunnel.in_progress);
console.log('Steps created:', clonedFunnel.steps.length);
Monitor Clone Progressโ
// Check funnel status
const funnel = await Funnels.findById(newFunnelId).populate('steps');
console.log('Funnel pending:', funnel.pending);
console.log('Funnel in progress:', funnel.in_progress);
console.log('Total steps:', funnel.steps.length);
// Check step status
const steps = await FunnelSteps.find({ funnel_id: newFunnelId });
const completedSteps = steps.filter(s => !s.pending && !s.in_progress);
console.log(`Completed steps: ${completedSteps.length}/${steps.length}`);
// Check processing flags
steps.forEach(step => {
console.log(`Step ${step.name}:`, {
pending: step.pending,
in_progress: step.in_progress,
processed: step.processed,
});
});
Test Stale Recoveryโ
// Create stale funnel (manually set old timestamp)
await Funnels.updateOne(
{ _id: newFunnelId },
{
pending: false,
in_progress: true,
updatedAt: new Date(Date.now() - 15 * 60 * 1000), // 15 minutes ago
},
);
// Trigger service
await funnelClone();
// Verify re-queued
setTimeout(async () => {
const funnel = await Funnels.findById(newFunnelId);
console.log('Re-queued:', funnel.in_progress); // Should be true (fresh timestamp)
}, 5000);
Verify Image Cloningโ
// Find original images
const originalImages = await SitesImages.find({ ref_id: originalFunnelId });
console.log('Original images:', originalImages.length);
// Find cloned images
const clonedImages = await SitesImages.find({ ref_id: newFunnelId });
console.log('Cloned images:', clonedImages.length);
// Verify source_key mapping
clonedImages.forEach(img => {
console.log(`${img.source_key} โ ${img.key}`);
});
Test Socket Emissionโ
// Mock socket service
const mockSocket = {
emit: (event, userIds, data) => {
console.log('Socket event:', event);
console.log('User IDs:', userIds);
console.log('Data:', data);
},
};
// Monitor socket emissions
// Expected event: 'funnel_cloning_completed'
// Expected data: { funnel_id, status: 'completed' }
Job Type: Scheduled + Two-Tier Queued System
Execution Frequency: Every 5 minutes
Average Duration: 30-300 seconds per funnel (depends on complexity)
Status: Active