Skip to main content

Store Subscription Activation

Overview

The Store Subscription Activation module provides automated provisioning of third-party services when subscriptions become active. It detects missing activations through MongoDB aggregation pipelines and intelligently routes activation requests to four specialized processors: Yext Listings, Twilio Phone Numbers, Duda InstaSites, and Duda Agency Sites. The system uses JWT-based authentication with 10-day expiration for service-to-service API calls and employs exponential backoff retry strategies to handle transient failures.

Key Features:

  • Missing Items Detection: MongoDB aggregation finds active subscriptions lacking external entity IDs
  • Intelligent Routing: Routes to 4 specialized processors based on subscription type
  • JWT Authentication: 10-day tokens with multi-scope access for API authorization
  • Retry Strategy: 10 attempts with exponential backoff (10-second base delay)
  • Stripe Metadata Sync: Updates subscription metadata with external product IDs
  • Socket Notifications: Real-time user notifications via WebSocket
  • Audit Logging: StoreThirdPartyLogs for success/failure tracking

Critical Business Impact:

  • Revenue Activation: Ensures purchased services are immediately provisioned
  • Customer Satisfaction: Prevents delays in service delivery
  • Third-Party Integration: Seamless activation across Yext, Twilio, and Duda platforms
  • Data Integrity: Links internal subscriptions with external entity IDs
  • Support Reduction: Auto-detects and resolves missing activations

Architecture

Execution Flow

sequenceDiagram
participant Cron as Cron Scheduler
participant Service as Activate Service
participant DB as MongoDB
participant Queue as Bull Queue
participant Listing as Listing Processor
participant Number as Number Processor
participant Instasite as Instasite Processor
participant Site as Site Processor
participant Yext as Yext API
participant Twilio as Twilio API
participant Duda as Duda API
participant Stripe as Stripe API
participant Socket as Socket Service

Note over Cron,Socket: Every 5 Seconds

Cron->>Service: Trigger activate check
Service->>DB: Find in_progress=false jobs
DB-->>Service: Queue items
Service->>Service: Mark in_progress=true

Service->>DB: Aggregate active subscriptions
Note over DB: Find listings subscriptions<br/>without yext.entity
DB-->>Service: Missing items array

loop For each missing item
Service->>Service: Generate JWT token
Note over Service: 10-day expiration<br/>Multi-scope access

Service->>DB: Create queue entry
Note over DB: Set retry config<br/>10 attempts, 10s base

alt Listing Activation
Service->>Queue: Add to listing queue
Queue->>Listing: Process job
Listing->>Yext: Create/Activate entity
Yext-->>Listing: Entity ID
Listing->>DB: Update Account.yext.entity
Listing->>Stripe: Update subscription metadata
Listing->>Socket: Emit listings_activated
Listing->>DB: Delete queue item
Listing->>DB: Log success to StoreThirdPartyLogs
end

alt Phone Number Activation
Service->>Queue: Add to number queue
Queue->>Number: Process job
Number->>Twilio: Get addresses
Twilio-->>Number: Address SID
Number->>Twilio: Update number with address
Number->>DB: Delete queue item
Number->>DB: Log success to StoreThirdPartyLogs
end

alt Instasite Activation
Service->>Queue: Add to instasite queue
Queue->>Instasite: Process job
Instasite->>Duda: Publish site
Duda-->>Instasite: Site status & domain
Instasite->>DB: Update InstaSite status
Instasite->>Stripe: Update subscription metadata
Instasite->>Socket: Emit instasite_status_changed
Instasite->>DB: Delete queue item
Instasite->>DB: Log success to StoreThirdPartyLogs
end

alt Site Activation
Service->>Queue: Add to site queue
Queue->>Site: Process job
Site->>DB: Fetch Contact for business info
alt New Site from Template
Site->>Duda: Create site from template
Site->>Duda: Upload logo to site
Site->>Duda: Update site content
Site->>Duda: Create account
Site->>Duda: Assign account to site
Site->>DB: Save AgencyWebsite
end
Site->>Duda: Publish site
Site->>Stripe: Update subscription metadata
Site->>DB: Delete queue item
Site->>DB: Log success to StoreThirdPartyLogs
end
end

Service->>DB: Mark in_progress=false

Component Structure

queue-manager/
├── services/
│ └── store/
│ └── subscriptions/
│ └── activate.js # Missing items detection service
├── queues/
│ └── store/
│ └── subscriptions/
│ └── activate/
│ ├── listing.js # Yext listing activation
│ ├── numbers.js # Twilio number activation
│ ├── instasite.js # Duda instasite activation
│ └── site.js # Duda agency site activation
└── utilities/
└── listings.js # Listing activation utility

Cron Schedule

File: queue-manager/services/store/subscriptions/activate.js

'*/5 * * * * *'; // Every 5 seconds

Pattern: Ultra-high-frequency scheduler for rapid activation

  • In-Progress Locking: Prevents concurrent executions
  • Database Lock: Sets in_progress: true on queue items
  • Purpose: Ensures near-instant service provisioning

Configuration

Environment Variables

VariableTypeDefaultDescription
SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTSNumber10Max retry attempts for activation
YEXT_API_KEYSStringRequiredYext API key for listings
YEXT_API_VPARAMString20200525Yext API version parameter
DUDA_TOKENStringRequiredDuda API token for site operations
DUDA_SITE_PERMISSIONSJSONRequiredDefault permissions for created accounts
API_BASE_URLStringRequiredInternal API base URL
SOCKET_APIStringRequiredSocket service URL for notifications
STRIPE_SECRET_KEYStringRequiredStripe API key for metadata updates
WASABI_PUBLIC_IMAGE_DOWNLOADStringRequiredWasabi CDN URL for logo uploads
WHITE_LABEL_DOMAINStringRequiredDashboard domain for site content

JWT Token Configuration

Generation: queue-manager/services/store/subscriptions/activate.js

const jwt = require('jsonwebtoken');
const secret = process.env.JWT_SECRET;

const token = jwt.sign(
{
user_id: queue.user_id,
user_type: queue.user_type,
id: queue.user_id,
isAdmin: false,
scope: 'users.me sites communications system analytics',
account_id: queue.account_id,
parent_account: queue.parent_account,
},
secret,
{ expiresIn: '10d' },
);

Token Scope Permissions:

  • users.me - User profile access
  • sites - Site management
  • communications - Messaging operations
  • system - System-level features
  • analytics - Analytics data access

Queue Retry Configuration

Pattern: queue-manager/services/store/subscriptions/activate.js

{
attempts: parseInt(process.env.SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTS || '10'),
backoff: {
type: 'exponential',
delay: 10000 // 10 seconds base delay
}
}

Retry Schedule:

AttemptDelayTotal Wait
10s0s
210s10s
320s30s
440s70s
580s150s
6160s310s
7320s630s
8640s1270s
91280s2550s
102560s5110s

Service Implementation

Missing Items Detection

File: queue-manager/services/store/subscriptions/activate.js

Purpose: Finds active subscriptions without corresponding external entities

MongoDB Aggregation Pipeline

const missingItems = await StoreSubscription.aggregate([
{
$match: {
status: { $in: ['active', 'past_due'] },
'plan.metadata.product_type': 'listings',
},
},
{
$lookup: {
from: 'accounts',
localField: 'metadata.account_id',
foreignField: '_id',
as: 'account',
},
},
{ $unwind: { path: '$account', preserveNullAndEmptyArrays: false } },
{
$match: {
'account.yext.entity': { $exists: false },
},
},
{
$project: {
stripe_id: 1,
account_id: '$metadata.account_id',
user_id: '$metadata.user_id',
parent_account: '$metadata.parent_account',
user_type: '$metadata.user_type',
},
},
]);

Pipeline Stages:

  1. $match: Filter active/past_due listings subscriptions
  2. $lookup: Join with Accounts collection
  3. $unwind: Flatten account array
  4. $match: Filter accounts missing yext.entity
  5. $project: Extract required fields

Output Structure:

{
_id: ObjectId,
stripe_id: String,
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
user_type: String
}

Queue Creation Loop

for (const item of missingItems) {
const token = jwt.sign(
{
/* JWT payload */
},
secret,
{ expiresIn: '10d' },
);

const queueData = {
queue_name: 'store_subscription_activate_listings_queue',
account_id: item.account_id,
user_id: item.user_id,
parent_account: item.parent_account,
client_id: item.parent_account,
type: 'subscription',
sub_type: 'activate',
status: 'pending',
data: {
target_account_id: item.account_id,
subscription_id: item.stripe_id,
token: token,
},
additional_data: {
subscription_id: item.stripe_id,
},
};

await Queue.create(queueData);
}

Routing Logic

Subscription Type Mapping:

Product TypeQueue NameProcessor
listingsstore_subscription_activate_listings_queuelisting.js
phone_numberstore_subscription_activate_numbers_queuenumbers.js
instasitestore_subscription_activate_instasite_queueinstasite.js
sitestore_subscription_activate_site_queuesite.js

Routing Implementation:

const queueMap = {
listings: 'store_subscription_activate_listings_queue',
phone_number: 'store_subscription_activate_numbers_queue',
instasite: 'store_subscription_activate_instasite_queue',
site: 'store_subscription_activate_site_queue',
};

const queueName = queueMap[productType] || 'store_subscription_activate_listings_queue';

Processor Implementations

1. Listing Processor

File: queue-manager/queues/store/subscriptions/activate/listing.js Utility: queue-manager/utilities/listings.js

Core Logic

const activateListing = async (target_account_id, subscription_id) => {
const acc = await Account.findOne({ _id: target_account_id });
const subscription = await StoreSubscription.findOne({
$or: [
{ stripe_id: subscription_id },
{
'plan.metadata.product_type': 'listings',
'metadata.account_id': target_account_id,
status: { $in: ['active', 'past_due'] },
},
],
});

let yext_entity_id;

if (acc.yext?.entity) {
// Activate existing entity
await activateEntity(acc.yext.entity);
yext_entity_id = acc.yext.entity;
await Account.updateOne({ _id: target_account_id }, [
{ $set: { yext_status: true } },
{ $unset: ['yext.pending', 'yext.error'] },
]);
} else {
// Create new entity
const countryCode = findCountryCode(acc.address?.country);

const payload = {
address: {
countryCode,
city: acc.address?.city,
postalCode: acc.address?.postal_code,
region: acc.address?.state_province,
line1: acc.address?.street,
line2: acc.address?.unit,
},
meta: { countryCode },
name: acc.name,
mainPhone: acc.phone,
};

const response = await createEntity({ body: payload });
yext_entity_id = response.data.response.meta.id;

await Account.updateOne({ _id: target_account_id }, [
{ $set: { yext_status: true, 'yext.entity': yext_entity_id } },
{ $unset: ['yext.pending', 'yext.error'] },
]);

await activateEntity(yext_entity_id);
}

// Update Stripe metadata
await stripe.subscriptions.update(subscription_id, {
metadata: { external_product_id: yext_entity_id },
});

// Emit socket notification
await socketEmit('listings_activated', userIds, { yext: { entity: yext_entity_id } });

return yext_entity_id;
};

Yext API Integration

Create Entity:

POST https://api.yext.com/v2/accounts/me/entities
Headers:
api-key: YEXT_API_KEYS
Params:
v: 20200525
entityType: location
Body:
{
"address": {...},
"meta": {...},
"name": "Business Name",
"mainPhone": "+1234567890"
}

Activate Entity:

POST https://api.yext.com/v2/accounts/me/existinglocationaddrequests
Headers:
api-key: YEXT_API_KEYS
Params:
v: 20200525
Body:
{
"existingLocationId": "entity_id",
"skus": ["LC-00000019"]
}

SKU: LC-00000019 (Listings product)

Error Handling

try {
yext_entity_id = await activateListing(target_account_id, subscription_id);
} catch (err) {
// Store error in Account document
await Account.updateOne({ _id: target_account_id }, [
{ $set: { 'yext.error': getErrorMessages(err) || err.message } },
]);
throw err;
}

Success Actions

  1. Delete queue item from Queue collection
  2. Create success log in StoreThirdPartyLogs
  3. Update Account.yext_status = true
  4. Set Account.yext.entity to Yext entity ID
  5. Update Stripe subscription metadata
  6. Emit listings_activated socket event

2. Phone Number Processor

File: queue-manager/queues/store/subscriptions/activate/numbers.js

Core Logic

const processCb = async (job, done) => {
const { number, token, account_id } = job.data;

try {
// Retrieve Twilio addresses
let addresses = await axios.get(`${API_URL}/v1/e/twilio/address`, {
headers: { Authorization: `Bearer ${token}` },
});
const addressSID = addresses?.data?.data?.[0]?.sid;

// Update number with address
await axios.post(
`${API_URL}/v1/e/twilio/numbers/${number}`,
{ addressSid: addressSID },
{ headers: { Authorization: `Bearer ${token}` } },
);

return done();
} catch (err) {
done(err);
}
};

Twilio API Flow

  1. Fetch Addresses: GET /v1/e/twilio/address

    • Retrieves account's Twilio addresses
    • Extracts first address SID
  2. Update Number: POST /v1/e/twilio/numbers/{number}

    • Associates number with address
    • Required for regulatory compliance

Success Actions

  1. Delete queue item from Queue collection
  2. Create success log in StoreThirdPartyLogs (type: numbers-subscription-activation)

3. Instasite Processor

File: queue-manager/queues/store/subscriptions/activate/instasite.js

Core Logic

const activateInstasite = async (id, token) => {
const client = await DudaClient(process.env.DUDA_TOKEN);

// Find site
let site = (await InstasiteModel.findById(id)) || (await SiteModel.findById(id));
if (!site) throw new Error('Site not found');

// Publish site
await client.sites.publish({ site_name: site.builder_id });

// Get updated site details
const s = await client.sites.get({ site_name: site.builder_id });

return {
id: site._id.toString(),
builder_id: site.builder_id,
status: s.publish_status,
domain: {
custom: s.site_domain,
default: s.site_default_domain,
},
};
};

Tier Upgrade Support

if (job.data.tier) {
await client.plans.update({
site_name: site.builder_id,
plan_id: tier,
});
}

Success Actions

  1. Update InstaSite.status to published status
  2. Update InstaSite.purchased timestamp
  3. Update InstaSite.domain fields
  4. Update InstasitesAdditionalInformation with same data
  5. Update Stripe subscription metadata with internal_product_id and external_product_id
  6. Emit instasite_status_changed socket event
  7. Delete queue item from Queue collection
  8. Create success log in StoreThirdPartyLogs (type: instasites-subscription-activation)

4. Site Processor

File: queue-manager/queues/store/subscriptions/activate/site.js

Core Logic - Site Creation Flow

const processCb = async (job, done) => {
const { uid, template_id, subscription_id, token, account_id } = job.data;

// 1. Fetch business information
let business = await Contact.findOne({ account: account_id });

// 2. Validate business data
if (!business.phone || business.phone.length < 4) throw new Error('Invalid phone');
if (!business.email) throw new Error('Invalid email');

// 3. Prepare site data
const site_data = {
site_business_info: {
business_name: business.name,
phone_number: business.phone,
email: business.email,
address: {
/* full address */
},
},
};

// 4. Check if template or existing site
let template = await Template.findById(template_id);
let existingSite = await AgencyWebsite.findById(template_id);

if (template) {
// New Site Creation Flow

// Step 1: Create site from template
let createdSite = await axios.post(`${API_URL}/v1/e/duda/sites`, {
template_id: template.builder_id,
site_data,
});

// Step 2: Upload business logo
if (business.image?.key) {
await axios.post(`${API_URL}/v1/e/duda/content/${site_name}/upload`, [
{ src: `${WASABI_PUBLIC_IMAGE_DOWNLOAD}/${business.image.key}` },
]);
}

// Step 3: Update site content
await axios.put(`${API_URL}/v1/e/duda/content/${site_name}`, {
location_data: { phones, emails, address },
business_data: { name, logo_url },
site_texts: {
custom: [
/* business data */
],
},
});

// Step 4: Create Duda account
const account_name = new mongoose.Types.ObjectId();
await axios.post(`${API_URL}/v1/e/duda/accounts`, { account_name });

// Step 5: Assign account to site
await axios.post(`${API_URL}/v1/e/duda/accounts/${account_name}/sites/${site_name}`, {
permissions: JSON.parse(process.env.DUDA_SITE_PERMISSIONS),
});

// Step 6: Save to database
await new AgencyWebsite(newSiteData).save();
}

// 5. Update Stripe metadata
await stripe.subscriptions.update(subscription_id, {
metadata: {
action_value: site_id,
internal_product_id: site_id,
external_product_id: builder_id,
},
});

// 6. Publish site
await axios.put(`${API_URL}/v1/agencywebsites/${site_id}/publish`, {});

// 7. Trigger thumbnail generation
await pubSubClient
.topic('v2.sites-thumbnail-generator')
.publish(Buffer.from(JSON.stringify({ id: builder_id })));

return done();
};

Site Content Customization

Location Data:

location_data: {
phones: [{ phoneNumber: phone, label: 'Phone Number' }],
emails: [{ emailAddress: email, label: 'Email Address' }],
address: {
streetAddress: street,
postalCode: postal_code,
region: state_province,
city: city,
country: country
}
}

Custom Text Fields:

site_texts: {
custom: [
{ label: 'Business Name', text: business.name },
{ label: 'Dashboard URL', text: `${WHITE_LABEL_DOMAIN}/${account_id}` },
{ label: 'Street Address', text: business.address.street },
// ... more fields
];
}

Success Actions

  1. Create AgencyWebsite document with full site details
  2. Update Stripe subscription metadata with site IDs
  3. Publish site via Duda API
  4. Trigger thumbnail generation via Google Cloud Pub/Sub
  5. Delete queue item from Queue collection
  6. Create success log in StoreThirdPartyLogs (type: sites-subscription-activation)

Data Models

Queue Entry Structure

Collection: Queue

{
_id: ObjectId,
queue_name: String, // e.g., 'store_subscription_activate_listings_queue'
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
client_id: ObjectId,
type: 'subscription',
sub_type: 'activate',
status: 'pending' | 'in_progress' | 'failed',
data: {
target_account_id: ObjectId,
subscription_id: String, // Stripe subscription ID
token: String, // JWT token

// Processor-specific fields
number: String, // For numbers processor
instasite: ObjectId, // For instasite processor
tier: String, // For instasite tier upgrade
template_id: ObjectId, // For site processor
uid: ObjectId // For site processor (creator)
},
additional_data: {
subscription_id: String
},
failed: Boolean,
in_progress: Boolean,
created_at: Date,
updated_at: Date
}

Account Update Structure

Collection: Account

Listing Activation Updates:

{
yext_status: true,
yext: {
entity: String, // Yext entity ID
// Remove these fields on success:
// pending: Boolean,
// error: String
}
}

Subscription Metadata Updates

Stripe Subscription Metadata:

// Listing
metadata: {
external_product_id: String // Yext entity ID
}

// Instasite
metadata: {
internal_product_id: ObjectId, // InstaSite MongoDB ID
external_product_id: String // Duda site name
}

// Site
metadata: {
action_value: ObjectId, // AgencyWebsite MongoDB ID
internal_product_id: ObjectId, // AgencyWebsite MongoDB ID
external_product_id: String // Duda site name
}

Audit Log Structure

Collection: StoreThirdPartyLogs

{
account_id: ObjectId,
user_id: ObjectId,
client_id: ObjectId,
parent_account: ObjectId,
status: 'success' | 'failed',
type: String, // See types below
additional_data: {
subscription_id: String
},
created_at: Date
}

Log Types:

  • listings-subscription-activation
  • numbers-subscription-activation
  • instasites-subscription-activation
  • sites-subscription-activation

Error Handling

Listing Processor Errors

Error Sources:

  1. Account not found
  2. Subscription not found
  3. Yext API errors (entity creation/activation)
  4. Invalid address data

Error Storage:

await Account.updateOne({ _id: target_account_id }, [{ $set: { 'yext.error': errorMessage } }]);

Common Yext Errors:

  • Invalid country code
  • Missing required address fields
  • Entity already exists
  • SKU not available

Number Processor Errors

Error Sources:

  1. No Twilio addresses found
  2. Twilio API authentication failure
  3. Number not found
  4. Address SID invalid

Error Response:

if (err.isAxiosError) {
message = err?.response?.data?.message;
}

Instasite Processor Errors

Error Sources:

  1. Site not found (neither InstaSite nor AgencyWebsite)
  2. Duda API authentication failure
  3. Site publish failure
  4. Plan upgrade failure

Validation:

if (!site) {
throw new Error('Site not found');
}

Site Processor Errors

Error Sources:

  1. Business/Contact not found
  2. Invalid phone number (< 4 digits)
  3. Missing email
  4. Template not found
  5. Duda API failures (create, upload, content, account, assign)
  6. Site publish failure

Critical Validations:

if (!business) throw new Error('Associated business not found');
if (phoneNumber.length < 4) throw new Error('Invalid phone number');
if (!business.email) throw new Error('Invalid email');
if (!template && !existingSite) throw new Error('Template/Site not found');

Retry Handling

All Processors:

const failedCb = async (job, err) => {
if (job.attemptsMade >= parseInt(process.env.SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTS || '10')) {
// Mark as permanently failed
await QueueModel.findByIdAndUpdate(job.id, { failed: true });

// Log failure
await new StoreThirdPartyLogs({
account_id: item.account_id,
user_id: item.user_id,
status: 'failed',
type: '{processor}-subscription-activation',
additional_data: item.additional_data,
}).save();
}
};

Exponential Backoff:

  • Base delay: 10 seconds
  • Max attempts: 10
  • Final wait: ~85 minutes total

Socket Notifications

Listing Activation Event

Event: listings_activated

await socketEmit('listings_activated', userIds, {
yext: { entity: yext_entity_id },
});

Recipients: All users in the account Payload: Yext entity ID

Instasite Activation Event

Event: instasite_status_changed

await socketEmit('instasite_status_changed', [user_id.toString()], {
id: updatedSite.id,
});

Recipients: Subscription owner Payload: InstaSite MongoDB ID


Testing Scenarios

1. Missing Listing Activation Detection

Setup:

const account = await Account.create({
name: 'Test Business',
address: { city: 'New York', country: 'US' },
// No yext.entity field
});

const subscription = await StoreSubscription.create({
stripe_id: 'sub_123',
status: 'active',
'plan.metadata.product_type': 'listings',
'metadata.account_id': account._id,
});

Expected:

  • Aggregation pipeline detects missing item
  • JWT token generated
  • Queue entry created
  • Listing processor activates Yext entity
  • Account updated with entity ID

2. Existing Entity Reactivation

Setup:

const account = await Account.create({
yext: { entity: 'existing_entity_id' },
yext_status: false,
});

Expected:

  • Skip entity creation
  • Call activateEntity API with existing ID
  • Update yext_status = true
  • Clear pending/error flags

3. Instasite Publish

Setup:

const instasite = await InstasiteModel.create({
builder_id: 'site_123',
status: 'NOT_PUBLISHED',
});

const queue = await Queue.create({
queue_name: 'store_subscription_activate_instasite_queue',
data: {
instasite: instasite._id,
subscription_id: 'sub_456',
},
});

Expected:

  • Duda publish API called
  • InstaSite status updated
  • Domain fields populated
  • Socket event emitted

4. Site Creation from Template

Setup:

const business = await Contact.create({
account: account_id,
name: 'Test Corp',
email: 'test@example.com',
phone: '+12345678901',
image: { key: 'logo.png' },
});

const template = await Template.create({
builder_id: 'template_123',
});

Expected:

  • Site created from template
  • Logo uploaded
  • Content customized with business data
  • Duda account created and assigned
  • Site saved to AgencyWebsite collection
  • Site published
  • Thumbnail generation triggered

5. Number Activation with Address

Setup:

const queue = await Queue.create({
queue_name: 'store_subscription_activate_numbers_queue',
data: {
number: '+19876543210',
token: jwtToken,
account_id: account_id,
},
});

Expected:

  • Twilio addresses fetched
  • Number updated with address SID
  • Queue entry deleted
  • Success log created

6. Retry Exhaustion

Setup:

// Mock Yext API to always fail
nock('https://api.yext.com')
.post('/v2/accounts/me/entities')
.times(10)
.reply(500, { error: 'Server error' });

Expected:

  • 10 retry attempts with exponential backoff
  • After 10th attempt: queue item marked failed: true
  • Failure log created in StoreThirdPartyLogs
  • Account.yext.error field set

7. JWT Token Expiration

Validation:

const decoded = jwt.verify(token, process.env.JWT_SECRET);
expect(decoded.exp - decoded.iat).toBe(10 * 24 * 60 * 60); // 10 days
expect(decoded.scope).toContain('users.me sites communications');

8. Concurrent Execution Prevention

Test:

// Start two concurrent cron executions
await Promise.all([subscriptionActivate(), subscriptionActivate()]);

// Verify only one processed items
const processedCount = await Queue.countDocuments({ in_progress: true });
expect(processedCount).toBeLessThanOrEqual(totalItems);

Performance Considerations

Aggregation Optimization

Index Requirements:

// StoreSubscription collection
{ status: 1, 'plan.metadata.product_type': 1 }
{ 'metadata.account_id': 1 }

// Account collection
{ '_id': 1, 'yext.entity': 1 }

Pipeline Performance:

  • Single aggregation replaces multiple queries
  • Indexes critical for $match and $lookup stages
  • Result set typically small (< 100 items)

Queue Concurrency

Settings:

QueueWrapper(name, 'global', callbacks, false);

Concurrency: Default (1 job at a time per processor)

  • Prevents race conditions on Account updates
  • Safe for Yext API rate limits
  • Ensures sequential Stripe metadata updates

Socket Emission

Pattern: Fire-and-forget with error logging

try {
await socketEmit(event, userIds, data);
} catch (err) {
logger.warn({ message: 'Socket emit failed', error: err });
// Continue execution - not critical
}

Stripe API Calls

Pattern: Update metadata after successful activation

try {
await stripe.subscriptions.update(subscription_id, { metadata });
} catch (err) {
logger.warn({ message: 'Stripe update failed', error: err });
return done(err); // Fail job to retry
}

Monitoring & Logging

Success Logging

Pattern: All processors log to StoreThirdPartyLogs

await new StoreThirdPartyLogs({
account_id: item.account_id,
user_id: item.user_id,
status: 'success',
type: '{processor}-subscription-activation',
additional_data: item.additional_data,
}).save();

Error Logging

Initiator Patterns:

  • QM/queues/store/subscriptions/activate/listing
  • QM/queues/store/subscriptions/activate/instasite
  • QM/queues/store/subscriptions/activate/numbers
  • QM/queues/store/subscriptions/activate/site

Error Details:

logger.error({
initiator: 'QM/queues/store/subscriptions/activate/{processor}',
message: errorMessage,
error: err,
job: job.id,
job_data: job.data,
});

Metrics to Monitor

  1. Queue Depth: Items in Queue collection by queue_name
  2. Failure Rate: Count of failed: true items
  3. Processing Time: Job duration by processor type
  4. API Error Rates: Yext, Twilio, Duda API failures
  5. Retry Distribution: Count of attempts before success/failure
  6. Missing Items Count: Result size from aggregation pipeline

Alerting Scenarios

  • High Failure Rate: > 10% of activations failing
  • Yext Entity Creation Failures: API errors or invalid data
  • Twilio Address Missing: Accounts without addresses
  • Duda Site Publish Failures: Template or permission issues
  • Queue Backlog: > 100 pending items
  • Long Processing Times: Jobs taking > 5 minutes


Summary

The Store Subscription Activation module provides critical automated provisioning for purchased services across four third-party platforms. Its intelligent missing items detection via MongoDB aggregation ensures no subscriptions are left unactivated, while specialized processors handle the unique requirements of Yext listings, Twilio phone numbers, and Duda sites. The system's JWT-based authentication, exponential backoff retry strategy, and comprehensive audit logging ensure reliable service delivery with full traceability.

Key Strengths:

  • Proactive Detection: Aggregation pipeline finds missing activations automatically
  • Multi-Platform Support: Handles 4 different third-party services seamlessly
  • Robust Retry Logic: 10 attempts with exponential backoff handles transient failures
  • Data Integrity: Updates internal DB, Stripe metadata, and emits real-time notifications
  • Audit Trail: Complete success/failure tracking in StoreThirdPartyLogs
  • Site Customization: Full business data integration for Duda sites

Critical for:

  • Revenue activation (purchased services immediately available)
  • Customer satisfaction (no manual intervention required)
  • Third-party synchronization (internal subscriptions linked to external entities)
  • Support efficiency (auto-detection reduces tickets)
💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM