Skip to main content

๐Ÿ“‹ Funnel Clone

๐Ÿ“– Overviewโ€‹

The Funnel Clone job implements a sophisticated two-tier cloning system that duplicates complete funnels including all steps, page components, images, forms, and chat plugins. It runs every 5 minutes, identifies funnels flagged for cloning, creates placeholder steps, then queues each step for detailed processing including asset duplication and ID replacement. The system supports resumable cloning with stale job recovery and real-time progress notifications via Socket.IO.

Complete Flow:

  1. Cron Initialization: queue-manager/crons/funnels/funnelClone.js
  2. Service Processing: queue-manager/services/funnels/funnelClone.js
  3. Funnel Queue: queue-manager/queues/funnels/funnelClone.js
  4. Step Queue: queue-manager/queues/funnels/funnelStepClone.js

Execution Pattern: Frequent polling (every 5 minutes) with two-level queue hierarchy

Queue Names: funnels_clone (parent), funnels_clone_step (child)

Environment Flag: QM_FUNNELS_CLONE=true (in index.js)

๐Ÿ”„ Complete Processing Flowโ€‹

sequenceDiagram
participant CRON as Cron Schedule<br/>(every 5 min)
participant SERVICE as Clone Service
participant FUNNEL_DB as Funnels DB
participant FUNNEL_QUEUE as Funnel Clone<br/>Queue
participant STEP_DB as Funnel Steps DB
participant STEP_QUEUE as Step Clone<br/>Queue
participant WASABI as Wasabi S3
participant SOCKET as Socket.IO<br/>Service

CRON->>SERVICE: funnelClone()
SERVICE->>SERVICE: Calculate stale threshold:<br/>10 minutes ago

SERVICE->>FUNNEL_DB: Aggregate query:<br/>- pending = true AND in_progress โ‰  true<br/>- OR in_progress + updatedAt < 10 min<br/>- Lookup accounts<br/>- Lookup steps<br/>- Check all steps completed<br/>- Limit 3 funnels
FUNNEL_DB-->>SERVICE: Pending funnels (max 3)

loop Each pending funnel
alt All steps already completed
SERVICE->>FUNNEL_DB: Clear flags:<br/>pending=false, in_progress=false
SERVICE->>FUNNEL_DB: Find active users<br/>for socket emit
SERVICE->>SOCKET: Emit 'funnel_cloning_completed'<br/>{funnel_id, status:'completed'}
else Steps incomplete
SERVICE->>FUNNEL_DB: Set in_progress=true
SERVICE->>FUNNEL_QUEUE: Add job: {newFunnel}
end
end

loop Each funnel job
FUNNEL_QUEUE->>FUNNEL_QUEUE: Fetch original funnel data

alt Original funnel not found
FUNNEL_QUEUE->>FUNNEL_DB: Set error + flags:<br/>original_funnel_deleted=true
else Original funnel found
FUNNEL_QUEUE->>STEP_DB: Find original steps

loop Each original step
alt Step already exists (stale recovery)
FUNNEL_QUEUE->>STEP_QUEUE: Re-queue existing step
else New step needed
FUNNEL_QUEUE->>STEP_DB: Create step placeholder:<br/>raw_html=null, pending=true
FUNNEL_QUEUE->>STEP_DB: Create components placeholder:<br/>components='{}'
FUNNEL_QUEUE->>FUNNEL_DB: Add step._id to funnel.steps[]
FUNNEL_QUEUE->>STEP_QUEUE: Queue step for processing
end
end
end
end

loop Each step job (concurrency: 5)
STEP_QUEUE->>STEP_DB: Set in_progress=true
STEP_QUEUE->>STEP_DB: Fetch original step + components

STEP_QUEUE->>STEP_QUEUE: Replace IDs:<br/>originalFunnelId โ†’ newFunnelId<br/>originalStepId โ†’ newStepId

alt Images not processed
STEP_QUEUE->>STEP_DB: Find existing cloned images
STEP_QUEUE->>WASABI: Copy images from original funnel<br/>(batch Promise.all)
STEP_QUEUE->>STEP_DB: Create SitesImages records
STEP_QUEUE->>STEP_QUEUE: Replace image keys in HTML/components
end

alt Forms not processed
STEP_QUEUE->>STEP_QUEUE: Extract form IDs from components<br/>(regex: /userform/([^?/]+))
loop Each form ID
STEP_QUEUE->>STEP_DB: Find original form
STEP_QUEUE->>STEP_DB: Create new form with new UUID
STEP_QUEUE->>STEP_QUEUE: Replace form IDs in HTML/components
end
end

alt Chat plugin not processed
STEP_QUEUE->>STEP_DB: Check if live_chat_widget enabled
STEP_QUEUE->>STEP_DB: Fetch support token for preview domain
STEP_QUEUE->>STEP_QUEUE: Replace chat plugin API key in HTML
end

STEP_QUEUE->>STEP_DB: Update components with replaced values
STEP_QUEUE->>STEP_DB: Update step:<br/>raw_html, pending=false, in_progress=false

STEP_QUEUE->>STEP_QUEUE: Check if all steps completed

alt All steps completed
STEP_QUEUE->>FUNNEL_DB: Clear funnel flags
STEP_QUEUE->>SOCKET: Emit 'funnel_cloning_completed'
end
end

๐Ÿ“ Source Filesโ€‹

1. Cron Initializationโ€‹

File: queue-manager/crons/funnels/funnelClone.js

Purpose: Schedule funnel clone processing every 5 minutes

Cron Pattern: */5 * * * * (every 5 minutes)

Initialization:

const funnelClone = require('../../services/funnels/funnelClone');
const cron = require('node-cron');
const logger = require('../../utilities/logger');

let inProgress = false;
exports.start = async () => {
try {
cron.schedule('*/5 * * * *', async () => {
if (!inProgress) {
inProgress = true;
await funnelClone();
inProgress = false;
}
});
} catch (err) {
logger.error({ initiator: 'QM/funnels/funnel-clone', error: err });
}
};

In-Progress Lock: Prevents overlapping executions during heavy cloning operations.

2. Service Processing (THE COMPLEX AGGREGATION LOGIC)โ€‹

File: queue-manager/services/funnels/funnelClone.js

Purpose: Find funnels flagged for cloning, check completion status, queue incomplete funnels

Key Functions:

  • Calculate 10-minute stale threshold for stuck jobs
  • Complex MongoDB aggregation with lookups and projections
  • Check if all funnel steps are completed
  • Queue incomplete funnels for processing
  • Emit Socket.IO events for completed funnels

Main Service Function (Simplified - full code ~200 lines):

const FunnelClone = require('../../queues/funnels/funnelClone');
const Funnels = require('../../models/funnels');
const User = require('../../models/user');
const logger = require('../../utilities/logger');
const { socketEmit } = require('../../utilities');

module.exports = async () => {
try {
const WAIT_TIME = 10; // minutes
const TEN_MINUTES = WAIT_TIME * 60 * 1000;
const FunnelInProgressReaddQueueTime = new Date(new Date().getTime() - TEN_MINUTES);

// Complex aggregation query
let query = [
{ $sort: { _id: 1 } },
{
$match: {
$or: [
{ $and: [{ pending: true }, { in_progress: { $ne: true } }] },
{
$and: [
{ in_progress: true },
{ updatedAt: { $lte: FunnelInProgressReaddQueueTime } },
],
},
],
},
},
{
$lookup: {
from: '_accounts',
localField: 'account_id',
foreignField: '_id',
as: 'account',
},
},
{ $set: { account: { $first: '$account' } } },
{
$lookup: {
from: 'funnel.step',
localField: '_id',
foreignField: 'funnel_id',
as: 'steps',
},
},
{
$project: {
// Calculate if all steps have pending=false AND in_progress=false
hasNoPendingAndInProgress: {
$map: {
input: '$steps',
as: 'step',
in: {
$cond: [
{
$and: [
{ $eq: ['$$step.pending', false] },
{ $eq: ['$$step.in_progress', false] },
{ $ne: ['$$step', null] },
],
},
true,
false,
],
},
},
},
// ... other fields
},
},
{
$project: {
// Calculate if all steps are processed
allStepsProcessed: {
$cond: {
if: {
$or: [
{ $eq: [{ $size: '$hasNoPendingAndInProgress' }, 0] },
{ $eq: ['$hasNoPendingAndInProgress', []] },
],
},
then: false,
else: {
$cond: {
if: { $eq: [{ $allElementsTrue: '$hasNoPendingAndInProgress' }, true] },
then: true,
else: false,
},
},
},
},
},
},
{ $limit: 3 }, // Process max 3 funnels per run
];

const pendingFunnels = await Funnels.aggregate(query);

if (!pendingFunnels?.length) {
logger.log({ message: 'No funnel to be cloned found ! Returning !!' });
} else {
const funnelClone = await FunnelClone.start();

for (const pendingFunnel of pendingFunnels) {
if (pendingFunnel.allStepsProcessed) {
// All steps completed - finalize
await Funnels.findByIdAndUpdate(pendingFunnel._id, {
pending: false,
in_progress: false,
});

// Emit socket notification
const user = await User.find({ account: pendingFunnel.account_id, active: true });
const userIds = user.map(u => u._id.toString());
await socketEmit('funnel_cloning_completed', userIds, {
funnel_id: pendingFunnel._id,
status: 'completed',
});
} else {
// Queue for processing
const newFunnel = await Funnels.findByIdAndUpdate(
pendingFunnel._id,
{ in_progress: true },
{ new: true },
);
await funnelClone.add({ newFunnel }, { attempts: 3 });
}
}
}
} catch (err) {
logger.error({ message: 'Error occured while cloning funnel.', error: err });
}
};

Note: Full service code is ~230 lines with complex aggregation logic.

3. Funnel Clone Queue (THE STEP CREATION LOGIC)โ€‹

File: queue-manager/queues/funnels/funnelClone.js

Purpose: Create placeholder steps and queue for detailed processing

Key Functions:

  • Fetch original funnel and steps
  • Handle missing original funnels
  • Create new step records with pending=true
  • Create placeholder component records
  • Queue steps for detailed processing
  • Support stale job recovery (re-queue incomplete steps)

Main Processor (Simplified - full code ~270 lines):

const processCb = async (job, done) => {
try {
let { newFunnel } = job.data;
const originalFunnelId = newFunnel.original_funnel_id;

// Fetch original funnel
const funnel = await Funnels.findOne({ _id: originalFunnelId }).lean().exec();

if (!funnel) {
// Original funnel deleted - mark error
await Funnels.updateOne(
{ _id: newFunnel._id },
{
$addToSet: { error: 'Original Funnel not found, id:' + originalFunnelId },
pending: false,
in_progress: false,
original_funnel_deleted: true,
},
);
done();
return;
}

// Fetch original steps
const oldSteps = await FunnelSteps.find({ funnel_id: funnel._id }).lean().exec();
const newSteps = [];
const stepQueue = await FunnelStepClone.start();
const WAIT_TIME = 5; // minutes for step stale recovery
const stepInProgressReaddQueueTime = new Date(Date.now() - WAIT_TIME * 60 * 1000);

for (let oldStepData of oldSteps) {
// Check if step already exists (stale recovery)
let isStepAlreadyExist = await FunnelSteps.find({
funnel_id: newFunnel._id,
original_step_id: oldStepData._id,
})
.lean()
.exec();

if (isStepAlreadyExist && isStepAlreadyExist.length > 0) {
// Filter stale steps (pending or in_progress > 5 min)
isStepAlreadyExist = isStepAlreadyExist.filter(step => {
return (
step.pending === true ||
(step.in_progress === true && step.updatedAt <= stepInProgressReaddQueueTime)
);
});

if (isStepAlreadyExist?.[0]) {
// Re-queue stale step
await stepQueue.add(
{
newStepId: isStepAlreadyExist._id,
originalStepId: isStepAlreadyExist.original_step_id,
newFunnelId: isStepAlreadyExist.funnel_id,
originalFunnelId,
account_id: newFunnel.account_id,
parentAccountId: parentAccount,
},
{ attempts: 3 },
);
continue;
}
} else {
// Create new step placeholder
newStep = {
...oldStepData,
funnel_id: newFunnel._id,
original_step_id: oldStepData._id,
raw_html: null,
domain_id: null,
header_tracking_code: null,
footer_tracking_code: null,
version: new Date().getTime(),
pending: true,
processed: {
images_processed: false,
forms_processed: false,
chat_plugin_processed: false,
},
};
delete newStep._id;
newSteps.push(newStep);
}
}

if (newSteps.length > 0) {
// Insert new steps
const savedSteps = await FunnelSteps.insertMany(newSteps);

// Create placeholder components
let newComponents = savedSteps.map(savedStep => ({
account_id: newFunnel.account_id,
funnel_id: newFunnel._id,
step_id: savedStep._id,
components: '{}',
}));
await FunnelStepComponents.insertMany(newComponents);

// Update funnel with step IDs
await Funnels.findByIdAndUpdate(newFunnel._id, {
$addToSet: { steps: savedSteps.map(s => s._id) },
});

// Queue steps for detailed processing
for (const step of savedSteps) {
await stepQueue.add(
{
newStepId: step._id,
originalStepId: step._doc.original_step_id,
newFunnelId: step._doc.funnel_id,
originalFunnelId,
account_id: newFunnel.account_id,
parentAccountId: parentAccount,
},
{ attempts: 3 },
);
}
}

done();
} catch (err) {
done(err);
}
};

4. Step Clone Queue (THE DETAILED ASSET PROCESSING)โ€‹

File: queue-manager/queues/funnels/funnelStepClone.js

Purpose: Clone step content including images, forms, and chat plugins

Key Functions:

  • Process images (copy from Wasabi S3, create new records)
  • Process forms (duplicate with new UUIDs)
  • Process chat plugins (replace API keys)
  • Replace all IDs in HTML and components
  • Check for all steps completed
  • Emit Socket.IO completion events

Main Processor (Simplified - full code ~700 lines):

const processCb = async (job, done) => {
let { newStepId, originalStepId, newFunnelId, originalFunnelId, account_id, parentAccountId } =
job.data;

try {
// Set in_progress flag
await FunnelSteps.findByIdAndUpdate(newStepId, { in_progress: true });

// Fetch original step and components
const originalComponents = await FunnelStepComponents.findOne({ step_id: originalStepId })
.lean()
.exec();
const originalStepInfo = await FunnelSteps.findOne({ _id: originalStepId }).lean().exec();
let originalHtmlContent = originalStepInfo?.raw_html || '';

// Replace funnel and step IDs
let keyValuePairObject = {
[originalFunnelId.toString()]: newFunnelId.toString(),
[originalStepId.toString()]: newStepId.toString(),
};
replaceObjectComponents = await replaceKeysInString(
keyValuePairObject,
originalComponents.components,
);
replaceObjectHtml = await replaceKeysInString(keyValuePairObject, originalHtmlContent);

let processedObj = {};
let errors = [];

// 1. Process Images
if (!newStepInfo?.processed?.images_processed) {
const { processed, error, imageReplaceObj, componentsReplaceObj } = await processImage({
originalFunnelId,
originalComponents,
parentAccountId,
originalHtmlContent,
account_id,
newFunnelId,
newStepId,
});
if (processed) {
processedObj.images_processed = true;
replaceObjectHtml = imageReplaceObj;
replaceObjectComponents = componentsReplaceObj;
}
errors.push(error);
}

// 2. Process Forms
if (!newStepInfo?.processed?.forms_processed) {
const { processed, error, formsReplaceObj, componentsReplaceObj } = await processForms({
originalComponents: replaceObjectComponents,
originalStepId,
originalHtmlContent: replaceObjectHtml,
account_id,
newFunnelId,
});
if (processed) {
processedObj.forms_processed = true;
replaceObjectHtml = formsReplaceObj;
replaceObjectComponents = componentsReplaceObj;
}
errors.push(error);
}

// 3. Process Chat Plugin
if (!newStepInfo?.processed?.chat_plugin_processed) {
const { processed, error, chatPluginReplaceObj } = await replaceChatPlugin({
originalFunnelId,
originalStepId,
newStepId,
originalHtmlContent,
});
if (processed) {
processedObj.chat_plugin_processed = true;
if (chatPluginReplaceObj) replaceObjectHtml = chatPluginReplaceObj;
}
errors.push(error);
}

// Update components and step
await FunnelStepComponents.updateOne(
{ step_id: newStepId },
{ $addToSet: { errors }, components: replaceObjectComponents.components },
);

if (allProcessed) {
await FunnelSteps.findByIdAndUpdate(newStepId, {
in_progress: false,
pending: false,
raw_html: replaceObjectHtml,
processed: processedObj,
});
}

// Check if all steps completed
await checkForAllStepCompleted({ newFunnelId });

done();
} catch (err) {
done(err);
}
};

Helper Functions:

  1. processImage: Copy images from Wasabi S3, create new records
  2. processForms: Duplicate forms with new UUIDs
  3. replaceChatPlugin: Update chat plugin API keys
  4. checkForAllStepCompleted: Check if all steps done, emit socket event

๐Ÿ—„๏ธ Collections Usedโ€‹

funnelsโ€‹

  • Operations: Find, Update, Aggregate
  • Model: shared/models/funnels.js
  • Usage Context: Track funnel cloning status

Key Fields:

  • pending: Boolean - funnel awaiting cloning
  • in_progress: Boolean - funnel currently being cloned
  • original_funnel_id: Reference to source funnel
  • original_funnel_deleted: Boolean - source no longer exists
  • steps: Array of step ObjectIds
  • live_chat_widget: Boolean - enable chat plugin
  • error: Array of error messages

funnel.stepโ€‹

  • Operations: Find, Insert, Update
  • Model: shared/models/funnel.step.js
  • Usage Context: Clone funnel pages/steps

Key Fields:

  • funnel_id: Parent funnel reference
  • original_step_id: Source step reference
  • raw_html: Page HTML content
  • pending: Boolean - step awaiting processing
  • in_progress: Boolean - step currently processing
  • processed: Object - track sub-processing status
    • images_processed: Boolean
    • forms_processed: Boolean
    • chat_plugin_processed: Boolean

funnel.step.componentsโ€‹

  • Operations: Find, Insert, Update
  • Model: shared/models/funnel.step.components.js
  • Usage Context: Store step component definitions

Key Fields:

  • step_id: Parent step reference
  • funnel_id: Parent funnel reference
  • account_id: Account owner
  • components: JSON string - component definitions
  • errors: Array of processing errors

sites_imagesโ€‹

  • Operations: Find, Insert
  • Model: shared/models/sites-images.js
  • Usage Context: Track cloned images

Key Fields:

  • ref_id: Funnel ID reference
  • account_id: Account owner
  • parent_account: Parent account (for sub-accounts)
  • source_key: Original S3 key
  • key: New S3 key (UUID-based)
  • origin: 'funnels'
  • additionalInfo: Bucket, ContentType, S3 URL

formsโ€‹

  • Operations: Find, Insert
  • Model: shared/models/forms.js
  • Usage Context: Duplicate embedded forms

Key Fields:

  • form_secret_id: UUID identifier
  • account_id: Account owner
  • form_name: Form name

support_tokensโ€‹

  • Operations: Find
  • Model: shared/models/support.tokens.js
  • Usage Context: Chat plugin API keys

Key Fields:

  • domain: Preview domain
  • token: API key for chat plugin

_accountsโ€‹

  • Operations: Lookup (aggregation)
  • Model: shared/models/account.js
  • Usage Context: Account data for socket emissions

usersโ€‹

  • Operations: Find
  • Model: shared/models/user.js
  • Usage Context: Socket emission targets

๐Ÿ”ง Job Configurationโ€‹

Cron Scheduleโ€‹

'*/5 * * * *'; // Every 5 minutes

Stale Job Thresholdsโ€‹

Funnel Level:

const WAIT_TIME = 10; // minutes
const TEN_MINUTES = WAIT_TIME * 60 * 1000;
const FunnelInProgressReaddQueueTime = new Date(new Date().getTime() - TEN_MINUTES);

Step Level:

const WAIT_TIME = 5; // minutes
const stepInProgressReaddQueueTime = new Date(Date.now() - WAIT_TIME * 60 * 1000);

Queue Settingsโ€‹

Funnel Queue:

QueueWrapper('funnels_clone', 'global', {
processCb,
failedCb,
completedCb,
settings: { maxStalledCount: 10 },
});

Step Queue:

QueueWrapper('funnels_clone_step', 'global', {
processCb,
failedCb,
completedCb,
settings: {
maxStalledCount: 10,
stalledInterval: 60 * 60 * 1000, // 1 hour (default is 30 sec)
},
concurrency: 5, // Process 5 steps in parallel
});

Job Retry Configurationโ€‹

Funnel Jobs: 3 attempts Step Jobs: 3 attempts

Batch Limitsโ€‹

Funnels Per Run: 3 (hardcoded limit) Steps Per Funnel: Unlimited (all steps queued) Concurrent Step Processing: 5

๐Ÿ“‹ Processing Logic - Detailed Flowโ€‹

1. Service Aggregation Queryโ€‹

Complex MongoDB Aggregation Pipeline:

Stage 1 - Sort: { $sort: { _id: 1 } }

  • Process oldest funnels first

Stage 2 - Match: Find pending or stale funnels

{
$match: {
$or: [
{ $and: [{ pending: true }, { in_progress: { $ne: true } }] }, // Pending
{
$and: [
{ in_progress: true },
{ updatedAt: { $lte: FunnelInProgressReaddQueueTime } }, // Stale (>10 min)
],
},
];
}
}

Stage 3-4 - Lookup Account: Join with _accounts collection

Stage 5 - Lookup Steps: Join with funnel.step collection

Stage 6 - Calculate Step Status: Map steps to check if pending=false AND in_progress=false

Stage 7 - Check All Steps Processed:

{
allStepsProcessed: {
$cond: {
if: { /* empty array check */ },
then: false,
else: {
$cond: {
if: { $allElementsTrue: '$hasNoPendingAndInProgress' },
then: true,
else: false
}
}
}
}
}

Stage 8 - Limit: { $limit: 3 } - Process max 3 funnels per run

2. Stale Job Recoveryโ€‹

Funnel Level (10-minute threshold):

  • Funnels with in_progress=true for >10 minutes
  • Re-queued for processing
  • Prevents stuck funnels from blocking clone queue

Step Level (5-minute threshold):

  • Steps with in_progress=true for >5 minutes
  • Re-queued for processing
  • Faster recovery for granular step operations

3. Image Processingโ€‹

Steps:

  1. Find existing cloned images (avoid duplicates):
const existingImages = await SitesImages.find({
$or: [{ ref_id: newFunnelId }, { account_id: account_id }],
});
  1. Find original funnel images not yet cloned:
const images = await SitesImages.find({
ref_id: originalFunnelId,
fileName: { $nin: existingKeys },
});
  1. Copy images in Wasabi S3 (batch operation):
const destinationKey = `${uuidv4()}/${image.fileName}`;
await wasabiObj.copyObject(sourceKey, destinationKey, bucketName);
  1. Create new image records:
const newImage = new SitesImages({
uploaded: true,
fileName: image.fileName,
fileSize: image.fileSize,
account_id: account_id,
parent_account: parentAccountId,
source_key: sourceKey,
origin: 'funnels',
key: destinationKey,
additionalInfo: { ... },
ref_id: newFunnelId
});
  1. Replace image keys in HTML/components:
keyValuePairObject[sourceKey] = destinationKey;
replacedHtml = await replaceKeysInString(keyValuePairObject, originalHtml);

4. Form Processingโ€‹

Steps:

  1. Extract form IDs from components using regex:
const regex = /\/userform\/([^?/]+)/g;
const matches = originalComponents?.components?.match(regex);
const extractedParts = [...new Set(matches.map(match => match.split('/userform/')[1]))];
  1. For each form ID, find original form:
const matchingForm = await Forms.findOne({ form_secret_id: extractedForm });
  1. Clone form with new UUID:
matchingForm.form_secret_id = uuidv4();
delete matchingForm._id;
let newForm = new Forms(matchingForm);
await newForm.save();
  1. Replace form IDs in HTML/components:
keyValuePairObject[extractedForm] = newForm.form_secret_id;

5. Chat Plugin Processingโ€‹

Steps:

  1. Check if live chat enabled:
const isChatPluginEnabled = await Funnels.findOne({
_id: originalFunnelId,
live_chat_widget: true,
});
  1. Fetch support token for preview domain:
const supportToken = await SupportToken.findOne({ domain: previewDomain });
  1. Replace API key in HTML:
const apiKeyRegex = /(<script[^>]*?custom-chat-widget="true"[^>]*?api-key=")[^"]*?(")/g;
replacedHtml = originalHtml.replace(apiKeyRegex, `$1${supportToken?.token}$2`);

6. Completion Checkโ€‹

Aggregation Query (runs after each step completion):

const checkforAllStepsCompleted = [
{ $match: { _id: newFunnelId } },
{ $lookup: { from: 'funnel.step', ... } },
{
$project: {
allStepsProcessed: {
$cond: {
if: { $allElementsTrue: '$hasNoPendingAndInProgress' },
then: true,
else: false
}
}
}
}
];

If All Steps Completed:

  1. Clear funnel flags: pending=false, in_progress=false
  2. Find active users for account
  3. Emit Socket.IO event: 'funnel_cloning_completed'

7. Socket.IO Notificationsโ€‹

Event: funnel_cloning_completed

Payload:

{
funnel_id: newFunnelId,
status: 'completed'
}

Recipients: All active users in the account

Triggered:

  • When all steps completed (from step queue)
  • When funnel already completed (from service)

๐Ÿšจ Error Handlingโ€‹

Common Error Scenariosโ€‹

Original Funnel Deletedโ€‹

Detection: Original funnel not found in database

Handling:

await Funnels.updateOne(
{ _id: newFunnel._id },
{
$addToSet: { error: 'Original Funnel not found' },
pending: false,
in_progress: false,
original_funnel_deleted: true,
},
);

Image Copy Failureโ€‹

Scenario: Wasabi S3 copy operation fails

Handling: Error logged, accumulated in errors array, processing continues

Impact: Some images may not be cloned, but funnel remains functional

Form Duplication Failureโ€‹

Scenario: Form not found or save fails

Handling: Error logged, processing continues with other forms

Impact: Form links may be broken in cloned funnel

Component Insertion Failureโ€‹

Scenario: MongoDB duplicate key error

Handling: Error thrown, job retries (up to 3 attempts)

Impact: Step creation fails, entire funnel retry needed

Failed Job Callbacksโ€‹

Funnel Level:

const failedCb = async (job, err) => {
const id = job?.data?._id;
if (job.attemptsMade >= job.opts.attempts) {
await Funnels.updateOne({ _id: id }, { pending: true, in_progress: false });
}
};

Step Level:

const failedCb = async (job, err) => {
const id = job.data._id;
if (job.attemptsMade >= job.opts.attempts) {
await FunnelSteps.updateOne({ _id: id }, { pending: true, in_progress: false });
}
};

Note: Failed jobs reset to pending=true for retry on next service run.

๐Ÿ“Š Monitoring & Loggingโ€‹

Success Loggingโ€‹

Service Level:

  • No funnels found
  • Funnel added to queue
  • Stalled funnel re-queued
  • All steps already completed

Funnel Queue:

  • Original funnel not found
  • Steps created and queued
  • Step re-queued (stale recovery)

Step Queue:

  • Image processing completed
  • Form processing completed
  • Chat plugin replaced
  • Step cloning completed
  • Socket emitted

Error Loggingโ€‹

Service Level:

  • Aggregation query failure
  • Socket emit failure

Funnel Queue:

  • Missing newFunnel in job data
  • Original funnel not found
  • Component insertion failure

Step Queue:

  • Missing originalComponents
  • Image processing errors
  • Form processing errors
  • Chat plugin errors

Performance Metricsโ€‹

  • Service Query Time: 1-3 seconds (complex aggregation)
  • Funnel Job Time: 5-30 seconds (depends on step count)
  • Step Job Time: 10-60 seconds (depends on image/form count)
  • Typical Funnel: 5-10 steps, 20-50 images, 1-3 forms
  • Concurrent Step Processing: 5 steps in parallel

๐Ÿ”— Integration Pointsโ€‹

Triggers This Jobโ€‹

  • Internal API: Sets pending=true on funnel clone request
  • Manual Trigger: Via API endpoint (if QM_HOOKS=true)

External Dependenciesโ€‹

  • Wasabi S3: Image storage and copying
  • Socket.IO Service: Real-time completion notifications
  • Preview Domain: Chat plugin configuration

Jobs That Depend On Thisโ€‹

  • Funnel Publishing: May trigger after successful clone
  • Analytics Tracking: Tracks clone operations

Utilities Usedโ€‹

  • replaceKeysInString: Replace IDs in HTML/components
  • wasabi: S3 operations
  • socketEmit: Real-time notifications

โš ๏ธ Important Notesโ€‹

Side Effectsโ€‹

  • โš ๏ธ Image Duplication: Creates new S3 objects (storage costs)
  • โš ๏ธ Form Duplication: Creates new form records
  • โš ๏ธ Database Load: Complex aggregations every 5 minutes
  • โš ๏ธ Socket Emissions: Real-time notifications to all account users

Performance Considerationsโ€‹

  • Batch Limit: 3 funnels per run prevents overload
  • Concurrent Steps: 5 parallel step processing
  • Stale Recovery: Prevents stuck jobs from blocking queue
  • Image Deduplication: Checks existing images before copying
  • Promise.all: Parallel S3 operations for performance

Business Logicโ€‹

Why Two-Tier Queue System?

  • Funnel queue creates structure quickly
  • Step queue handles detailed processing in parallel
  • Enables granular progress tracking
  • Supports resumable cloning

Why Stale Job Recovery?

  • Long-running operations may timeout
  • Server restarts mid-clone
  • Network failures during S3 operations
  • Prevents permanent stuck state

Why Placeholder Components?

  • Ensures step records exist immediately
  • Prevents foreign key errors
  • Enables incremental processing
  • Supports retry logic

Why Socket.IO Notifications?

  • Real-time progress updates
  • Better UX (no polling needed)
  • Immediate notification on completion
  • Multiple users notified simultaneously

Maintenance Notesโ€‹

  • Stale Thresholds: 10 min (funnel), 5 min (step) hardcoded
  • Batch Limit: 3 funnels per run hardcoded
  • Step Concurrency: 5 hardcoded
  • Image Deduplication: Critical for multi-step funnels
  • Form UUID: Ensures uniqueness across clones

Code Quality Issuesโ€‹

Issue 1: Missing Return After Error Log

if (!domain.cf_id) {
logger.error({ message: 'Incorrect domain provided' });
// Missing: done(); return;
}

Issue 2: Complex Aggregation Could Be Optimized

  • Consider caching account lookups
  • Index on pending, in_progress, updatedAt

Issue 3: Error Accumulation

  • Errors pushed to array but not always checked
  • Some errors silently ignored

๐Ÿงช Testingโ€‹

Manual Triggerโ€‹

# Via API (if QM_HOOKS=true)
POST http://localhost:6002/api/trigger/funnels/funnelClone

Simulate Funnel Cloneโ€‹

// Flag funnel for cloning
const originalFunnel = await Funnels.findOne({});
const newFunnel = await Funnels.create({
name: originalFunnel.name + ' (Clone)',
account_id: originalFunnel.account_id,
original_funnel_id: originalFunnel._id,
pending: true,
});

// Wait for processing (5 minutes) or trigger manually

// Verify completion
const clonedFunnel = await Funnels.findById(newFunnel._id);
console.log('Cloning complete:', !clonedFunnel.pending && !clonedFunnel.in_progress);
console.log('Steps created:', clonedFunnel.steps.length);

Monitor Clone Progressโ€‹

// Check funnel status
const funnel = await Funnels.findById(newFunnelId).populate('steps');
console.log('Funnel pending:', funnel.pending);
console.log('Funnel in progress:', funnel.in_progress);
console.log('Total steps:', funnel.steps.length);

// Check step status
const steps = await FunnelSteps.find({ funnel_id: newFunnelId });
const completedSteps = steps.filter(s => !s.pending && !s.in_progress);
console.log(`Completed steps: ${completedSteps.length}/${steps.length}`);

// Check processing flags
steps.forEach(step => {
console.log(`Step ${step.name}:`, {
pending: step.pending,
in_progress: step.in_progress,
processed: step.processed,
});
});

Test Stale Recoveryโ€‹

// Create stale funnel (manually set old timestamp)
await Funnels.updateOne(
{ _id: newFunnelId },
{
pending: false,
in_progress: true,
updatedAt: new Date(Date.now() - 15 * 60 * 1000), // 15 minutes ago
},
);

// Trigger service
await funnelClone();

// Verify re-queued
setTimeout(async () => {
const funnel = await Funnels.findById(newFunnelId);
console.log('Re-queued:', funnel.in_progress); // Should be true (fresh timestamp)
}, 5000);

Verify Image Cloningโ€‹

// Find original images
const originalImages = await SitesImages.find({ ref_id: originalFunnelId });
console.log('Original images:', originalImages.length);

// Find cloned images
const clonedImages = await SitesImages.find({ ref_id: newFunnelId });
console.log('Cloned images:', clonedImages.length);

// Verify source_key mapping
clonedImages.forEach(img => {
console.log(`${img.source_key} โ†’ ${img.key}`);
});

Test Socket Emissionโ€‹

// Mock socket service
const mockSocket = {
emit: (event, userIds, data) => {
console.log('Socket event:', event);
console.log('User IDs:', userIds);
console.log('Data:', data);
},
};

// Monitor socket emissions
// Expected event: 'funnel_cloning_completed'
// Expected data: { funnel_id, status: 'completed' }

Job Type: Scheduled + Two-Tier Queued System
Execution Frequency: Every 5 minutes
Average Duration: 30-300 seconds per funnel (depends on complexity)
Status: Active

๐Ÿ’ฌ

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM