Store Subscription Activation
Overview
The Store Subscription Activation module provides automated provisioning of third-party services when subscriptions become active. It detects missing activations through MongoDB aggregation pipelines and intelligently routes activation requests to four specialized processors: Yext Listings, Twilio Phone Numbers, Duda InstaSites, and Duda Agency Sites. The system uses JWT-based authentication with 10-day expiration for service-to-service API calls and employs exponential backoff retry strategies to handle transient failures.
Key Features:
- Missing Items Detection: MongoDB aggregation finds active subscriptions lacking external entity IDs
- Intelligent Routing: Routes to 4 specialized processors based on subscription type
- JWT Authentication: 10-day tokens with multi-scope access for API authorization
- Retry Strategy: 10 attempts with exponential backoff (10-second base delay)
- Stripe Metadata Sync: Updates subscription metadata with external product IDs
- Socket Notifications: Real-time user notifications via WebSocket
- Audit Logging: StoreThirdPartyLogs for success/failure tracking
Critical Business Impact:
- Revenue Activation: Ensures purchased services are immediately provisioned
- Customer Satisfaction: Prevents delays in service delivery
- Third-Party Integration: Seamless activation across Yext, Twilio, and Duda platforms
- Data Integrity: Links internal subscriptions with external entity IDs
- Support Reduction: Auto-detects and resolves missing activations
Architecture
Execution Flow
sequenceDiagram
participant Cron as Cron Scheduler
participant Service as Activate Service
participant DB as MongoDB
participant Queue as Bull Queue
participant Listing as Listing Processor
participant Number as Number Processor
participant Instasite as Instasite Processor
participant Site as Site Processor
participant Yext as Yext API
participant Twilio as Twilio API
participant Duda as Duda API
participant Stripe as Stripe API
participant Socket as Socket Service
Note over Cron,Socket: Every 5 Seconds
Cron->>Service: Trigger activate check
Service->>DB: Find in_progress=false jobs
DB-->>Service: Queue items
Service->>Service: Mark in_progress=true
Service->>DB: Aggregate active subscriptions
Note over DB: Find listings subscriptions<br/>without yext.entity
DB-->>Service: Missing items array
loop For each missing item
Service->>Service: Generate JWT token
Note over Service: 10-day expiration<br/>Multi-scope access
Service->>DB: Create queue entry
Note over DB: Set retry config<br/>10 attempts, 10s base
alt Listing Activation
Service->>Queue: Add to listing queue
Queue->>Listing: Process job
Listing->>Yext: Create/Activate entity
Yext-->>Listing: Entity ID
Listing->>DB: Update Account.yext.entity
Listing->>Stripe: Update subscription metadata
Listing->>Socket: Emit listings_activated
Listing->>DB: Delete queue item
Listing->>DB: Log success to StoreThirdPartyLogs
end
alt Phone Number Activation
Service->>Queue: Add to number queue
Queue->>Number: Process job
Number->>Twilio: Get addresses
Twilio-->>Number: Address SID
Number->>Twilio: Update number with address
Number->>DB: Delete queue item
Number->>DB: Log success to StoreThirdPartyLogs
end
alt Instasite Activation
Service->>Queue: Add to instasite queue
Queue->>Instasite: Process job
Instasite->>Duda: Publish site
Duda-->>Instasite: Site status & domain
Instasite->>DB: Update InstaSite status
Instasite->>Stripe: Update subscription metadata
Instasite->>Socket: Emit instasite_status_changed
Instasite->>DB: Delete queue item
Instasite->>DB: Log success to StoreThirdPartyLogs
end
alt Site Activation
Service->>Queue: Add to site queue
Queue->>Site: Process job
Site->>DB: Fetch Contact for business info
alt New Site from Template
Site->>Duda: Create site from template
Site->>Duda: Upload logo to site
Site->>Duda: Update site content
Site->>Duda: Create account
Site->>Duda: Assign account to site
Site->>DB: Save AgencyWebsite
end
Site->>Duda: Publish site
Site->>Stripe: Update subscription metadata
Site->>DB: Delete queue item
Site->>DB: Log success to StoreThirdPartyLogs
end
end
Service->>DB: Mark in_progress=false
Component Structure
queue-manager/
├── services/
│ └── store/
│ └── subscriptions/
│ └── activate.js # Missing items detection service
├── queues/
│ └── store/
│ └── subscriptions/
│ └── activate/
│ ├── listing.js # Yext listing activation
│ ├── numbers.js # Twilio number activation
│ ├── instasite.js # Duda instasite activation
│ └── site.js # Duda agency site activation
└── utilities/
└── listings.js # Listing activation utility
Cron Schedule
File: queue-manager/services/store/subscriptions/activate.js
'*/5 * * * * *'; // Every 5 seconds
Pattern: Ultra-high-frequency scheduler for rapid activation
- In-Progress Locking: Prevents concurrent executions
- Database Lock: Sets
in_progress: trueon queue items - Purpose: Ensures near-instant service provisioning
Configuration
Environment Variables
| Variable | Type | Default | Description |
|---|---|---|---|
SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTS | Number | 10 | Max retry attempts for activation |
YEXT_API_KEYS | String | Required | Yext API key for listings |
YEXT_API_VPARAM | String | 20200525 | Yext API version parameter |
DUDA_TOKEN | String | Required | Duda API token for site operations |
DUDA_SITE_PERMISSIONS | JSON | Required | Default permissions for created accounts |
API_BASE_URL | String | Required | Internal API base URL |
SOCKET_API | String | Required | Socket service URL for notifications |
STRIPE_SECRET_KEY | String | Required | Stripe API key for metadata updates |
WASABI_PUBLIC_IMAGE_DOWNLOAD | String | Required | Wasabi CDN URL for logo uploads |
WHITE_LABEL_DOMAIN | String | Required | Dashboard domain for site content |
JWT Token Configuration
Generation: queue-manager/services/store/subscriptions/activate.js
const jwt = require('jsonwebtoken');
const secret = process.env.JWT_SECRET;
const token = jwt.sign(
{
user_id: queue.user_id,
user_type: queue.user_type,
id: queue.user_id,
isAdmin: false,
scope: 'users.me sites communications system analytics',
account_id: queue.account_id,
parent_account: queue.parent_account,
},
secret,
{ expiresIn: '10d' },
);
Token Scope Permissions:
users.me- User profile accesssites- Site managementcommunications- Messaging operationssystem- System-level featuresanalytics- Analytics data access
Queue Retry Configuration
Pattern: queue-manager/services/store/subscriptions/activate.js
{
attempts: parseInt(process.env.SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTS || '10'),
backoff: {
type: 'exponential',
delay: 10000 // 10 seconds base delay
}
}
Retry Schedule:
| Attempt | Delay | Total Wait |
|---|---|---|
| 1 | 0s | 0s |
| 2 | 10s | 10s |
| 3 | 20s | 30s |
| 4 | 40s | 70s |
| 5 | 80s | 150s |
| 6 | 160s | 310s |
| 7 | 320s | 630s |
| 8 | 640s | 1270s |
| 9 | 1280s | 2550s |
| 10 | 2560s | 5110s |
Service Implementation
Missing Items Detection
File: queue-manager/services/store/subscriptions/activate.js
Purpose: Finds active subscriptions without corresponding external entities
MongoDB Aggregation Pipeline
const missingItems = await StoreSubscription.aggregate([
{
$match: {
status: { $in: ['active', 'past_due'] },
'plan.metadata.product_type': 'listings',
},
},
{
$lookup: {
from: 'accounts',
localField: 'metadata.account_id',
foreignField: '_id',
as: 'account',
},
},
{ $unwind: { path: '$account', preserveNullAndEmptyArrays: false } },
{
$match: {
'account.yext.entity': { $exists: false },
},
},
{
$project: {
stripe_id: 1,
account_id: '$metadata.account_id',
user_id: '$metadata.user_id',
parent_account: '$metadata.parent_account',
user_type: '$metadata.user_type',
},
},
]);
Pipeline Stages:
- $match: Filter active/past_due listings subscriptions
- $lookup: Join with Accounts collection
- $unwind: Flatten account array
- $match: Filter accounts missing
yext.entity - $project: Extract required fields
Output Structure:
{
_id: ObjectId,
stripe_id: String,
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
user_type: String
}
Queue Creation Loop
for (const item of missingItems) {
const token = jwt.sign(
{
/* JWT payload */
},
secret,
{ expiresIn: '10d' },
);
const queueData = {
queue_name: 'store_subscription_activate_listings_queue',
account_id: item.account_id,
user_id: item.user_id,
parent_account: item.parent_account,
client_id: item.parent_account,
type: 'subscription',
sub_type: 'activate',
status: 'pending',
data: {
target_account_id: item.account_id,
subscription_id: item.stripe_id,
token: token,
},
additional_data: {
subscription_id: item.stripe_id,
},
};
await Queue.create(queueData);
}
Routing Logic
Subscription Type Mapping:
| Product Type | Queue Name | Processor |
|---|---|---|
listings | store_subscription_activate_listings_queue | listing.js |
phone_number | store_subscription_activate_numbers_queue | numbers.js |
instasite | store_subscription_activate_instasite_queue | instasite.js |
site | store_subscription_activate_site_queue | site.js |
Routing Implementation:
const queueMap = {
listings: 'store_subscription_activate_listings_queue',
phone_number: 'store_subscription_activate_numbers_queue',
instasite: 'store_subscription_activate_instasite_queue',
site: 'store_subscription_activate_site_queue',
};
const queueName = queueMap[productType] || 'store_subscription_activate_listings_queue';
Processor Implementations
1. Listing Processor
File: queue-manager/queues/store/subscriptions/activate/listing.js
Utility: queue-manager/utilities/listings.js
Core Logic
const activateListing = async (target_account_id, subscription_id) => {
const acc = await Account.findOne({ _id: target_account_id });
const subscription = await StoreSubscription.findOne({
$or: [
{ stripe_id: subscription_id },
{
'plan.metadata.product_type': 'listings',
'metadata.account_id': target_account_id,
status: { $in: ['active', 'past_due'] },
},
],
});
let yext_entity_id;
if (acc.yext?.entity) {
// Activate existing entity
await activateEntity(acc.yext.entity);
yext_entity_id = acc.yext.entity;
await Account.updateOne({ _id: target_account_id }, [
{ $set: { yext_status: true } },
{ $unset: ['yext.pending', 'yext.error'] },
]);
} else {
// Create new entity
const countryCode = findCountryCode(acc.address?.country);
const payload = {
address: {
countryCode,
city: acc.address?.city,
postalCode: acc.address?.postal_code,
region: acc.address?.state_province,
line1: acc.address?.street,
line2: acc.address?.unit,
},
meta: { countryCode },
name: acc.name,
mainPhone: acc.phone,
};
const response = await createEntity({ body: payload });
yext_entity_id = response.data.response.meta.id;
await Account.updateOne({ _id: target_account_id }, [
{ $set: { yext_status: true, 'yext.entity': yext_entity_id } },
{ $unset: ['yext.pending', 'yext.error'] },
]);
await activateEntity(yext_entity_id);
}
// Update Stripe metadata
await stripe.subscriptions.update(subscription_id, {
metadata: { external_product_id: yext_entity_id },
});
// Emit socket notification
await socketEmit('listings_activated', userIds, { yext: { entity: yext_entity_id } });
return yext_entity_id;
};
Yext API Integration
Create Entity:
POST https://api.yext.com/v2/accounts/me/entities
Headers:
api-key: YEXT_API_KEYS
Params:
v: 20200525
entityType: location
Body:
{
"address": {...},
"meta": {...},
"name": "Business Name",
"mainPhone": "+1234567890"
}
Activate Entity:
POST https://api.yext.com/v2/accounts/me/existinglocationaddrequests
Headers:
api-key: YEXT_API_KEYS
Params:
v: 20200525
Body:
{
"existingLocationId": "entity_id",
"skus": ["LC-00000019"]
}
SKU: LC-00000019 (Listings product)
Error Handling
try {
yext_entity_id = await activateListing(target_account_id, subscription_id);
} catch (err) {
// Store error in Account document
await Account.updateOne({ _id: target_account_id }, [
{ $set: { 'yext.error': getErrorMessages(err) || err.message } },
]);
throw err;
}
Success Actions
- Delete queue item from
Queuecollection - Create success log in
StoreThirdPartyLogs - Update
Account.yext_status = true - Set
Account.yext.entityto Yext entity ID - Update Stripe subscription metadata
- Emit
listings_activatedsocket event
2. Phone Number Processor
File: queue-manager/queues/store/subscriptions/activate/numbers.js
Core Logic
const processCb = async (job, done) => {
const { number, token, account_id } = job.data;
try {
// Retrieve Twilio addresses
let addresses = await axios.get(`${API_URL}/v1/e/twilio/address`, {
headers: { Authorization: `Bearer ${token}` },
});
const addressSID = addresses?.data?.data?.[0]?.sid;
// Update number with address
await axios.post(
`${API_URL}/v1/e/twilio/numbers/${number}`,
{ addressSid: addressSID },
{ headers: { Authorization: `Bearer ${token}` } },
);
return done();
} catch (err) {
done(err);
}
};
Twilio API Flow
-
Fetch Addresses:
GET /v1/e/twilio/address- Retrieves account's Twilio addresses
- Extracts first address SID
-
Update Number:
POST /v1/e/twilio/numbers/{number}- Associates number with address
- Required for regulatory compliance
Success Actions
- Delete queue item from
Queuecollection - Create success log in
StoreThirdPartyLogs(type:numbers-subscription-activation)
3. Instasite Processor
File: queue-manager/queues/store/subscriptions/activate/instasite.js
Core Logic
const activateInstasite = async (id, token) => {
const client = await DudaClient(process.env.DUDA_TOKEN);
// Find site
let site = (await InstasiteModel.findById(id)) || (await SiteModel.findById(id));
if (!site) throw new Error('Site not found');
// Publish site
await client.sites.publish({ site_name: site.builder_id });
// Get updated site details
const s = await client.sites.get({ site_name: site.builder_id });
return {
id: site._id.toString(),
builder_id: site.builder_id,
status: s.publish_status,
domain: {
custom: s.site_domain,
default: s.site_default_domain,
},
};
};
Tier Upgrade Support
if (job.data.tier) {
await client.plans.update({
site_name: site.builder_id,
plan_id: tier,
});
}
Success Actions
- Update
InstaSite.statusto published status - Update
InstaSite.purchasedtimestamp - Update
InstaSite.domainfields - Update
InstasitesAdditionalInformationwith same data - Update Stripe subscription metadata with
internal_product_idandexternal_product_id - Emit
instasite_status_changedsocket event - Delete queue item from
Queuecollection - Create success log in
StoreThirdPartyLogs(type:instasites-subscription-activation)
4. Site Processor
File: queue-manager/queues/store/subscriptions/activate/site.js
Core Logic - Site Creation Flow
const processCb = async (job, done) => {
const { uid, template_id, subscription_id, token, account_id } = job.data;
// 1. Fetch business information
let business = await Contact.findOne({ account: account_id });
// 2. Validate business data
if (!business.phone || business.phone.length < 4) throw new Error('Invalid phone');
if (!business.email) throw new Error('Invalid email');
// 3. Prepare site data
const site_data = {
site_business_info: {
business_name: business.name,
phone_number: business.phone,
email: business.email,
address: {
/* full address */
},
},
};
// 4. Check if template or existing site
let template = await Template.findById(template_id);
let existingSite = await AgencyWebsite.findById(template_id);
if (template) {
// New Site Creation Flow
// Step 1: Create site from template
let createdSite = await axios.post(`${API_URL}/v1/e/duda/sites`, {
template_id: template.builder_id,
site_data,
});
// Step 2: Upload business logo
if (business.image?.key) {
await axios.post(`${API_URL}/v1/e/duda/content/${site_name}/upload`, [
{ src: `${WASABI_PUBLIC_IMAGE_DOWNLOAD}/${business.image.key}` },
]);
}
// Step 3: Update site content
await axios.put(`${API_URL}/v1/e/duda/content/${site_name}`, {
location_data: { phones, emails, address },
business_data: { name, logo_url },
site_texts: {
custom: [
/* business data */
],
},
});
// Step 4: Create Duda account
const account_name = new mongoose.Types.ObjectId();
await axios.post(`${API_URL}/v1/e/duda/accounts`, { account_name });
// Step 5: Assign account to site
await axios.post(`${API_URL}/v1/e/duda/accounts/${account_name}/sites/${site_name}`, {
permissions: JSON.parse(process.env.DUDA_SITE_PERMISSIONS),
});
// Step 6: Save to database
await new AgencyWebsite(newSiteData).save();
}
// 5. Update Stripe metadata
await stripe.subscriptions.update(subscription_id, {
metadata: {
action_value: site_id,
internal_product_id: site_id,
external_product_id: builder_id,
},
});
// 6. Publish site
await axios.put(`${API_URL}/v1/agencywebsites/${site_id}/publish`, {});
// 7. Trigger thumbnail generation
await pubSubClient
.topic('v2.sites-thumbnail-generator')
.publish(Buffer.from(JSON.stringify({ id: builder_id })));
return done();
};
Site Content Customization
Location Data:
location_data: {
phones: [{ phoneNumber: phone, label: 'Phone Number' }],
emails: [{ emailAddress: email, label: 'Email Address' }],
address: {
streetAddress: street,
postalCode: postal_code,
region: state_province,
city: city,
country: country
}
}
Custom Text Fields:
site_texts: {
custom: [
{ label: 'Business Name', text: business.name },
{ label: 'Dashboard URL', text: `${WHITE_LABEL_DOMAIN}/${account_id}` },
{ label: 'Street Address', text: business.address.street },
// ... more fields
];
}
Success Actions
- Create
AgencyWebsitedocument with full site details - Update Stripe subscription metadata with site IDs
- Publish site via Duda API
- Trigger thumbnail generation via Google Cloud Pub/Sub
- Delete queue item from
Queuecollection - Create success log in
StoreThirdPartyLogs(type:sites-subscription-activation)
Data Models
Queue Entry Structure
Collection: Queue
{
_id: ObjectId,
queue_name: String, // e.g., 'store_subscription_activate_listings_queue'
account_id: ObjectId,
user_id: ObjectId,
parent_account: ObjectId,
client_id: ObjectId,
type: 'subscription',
sub_type: 'activate',
status: 'pending' | 'in_progress' | 'failed',
data: {
target_account_id: ObjectId,
subscription_id: String, // Stripe subscription ID
token: String, // JWT token
// Processor-specific fields
number: String, // For numbers processor
instasite: ObjectId, // For instasite processor
tier: String, // For instasite tier upgrade
template_id: ObjectId, // For site processor
uid: ObjectId // For site processor (creator)
},
additional_data: {
subscription_id: String
},
failed: Boolean,
in_progress: Boolean,
created_at: Date,
updated_at: Date
}
Account Update Structure
Collection: Account
Listing Activation Updates:
{
yext_status: true,
yext: {
entity: String, // Yext entity ID
// Remove these fields on success:
// pending: Boolean,
// error: String
}
}
Subscription Metadata Updates
Stripe Subscription Metadata:
// Listing
metadata: {
external_product_id: String // Yext entity ID
}
// Instasite
metadata: {
internal_product_id: ObjectId, // InstaSite MongoDB ID
external_product_id: String // Duda site name
}
// Site
metadata: {
action_value: ObjectId, // AgencyWebsite MongoDB ID
internal_product_id: ObjectId, // AgencyWebsite MongoDB ID
external_product_id: String // Duda site name
}
Audit Log Structure
Collection: StoreThirdPartyLogs
{
account_id: ObjectId,
user_id: ObjectId,
client_id: ObjectId,
parent_account: ObjectId,
status: 'success' | 'failed',
type: String, // See types below
additional_data: {
subscription_id: String
},
created_at: Date
}
Log Types:
listings-subscription-activationnumbers-subscription-activationinstasites-subscription-activationsites-subscription-activation
Error Handling
Listing Processor Errors
Error Sources:
- Account not found
- Subscription not found
- Yext API errors (entity creation/activation)
- Invalid address data
Error Storage:
await Account.updateOne({ _id: target_account_id }, [{ $set: { 'yext.error': errorMessage } }]);
Common Yext Errors:
- Invalid country code
- Missing required address fields
- Entity already exists
- SKU not available
Number Processor Errors
Error Sources:
- No Twilio addresses found
- Twilio API authentication failure
- Number not found
- Address SID invalid
Error Response:
if (err.isAxiosError) {
message = err?.response?.data?.message;
}
Instasite Processor Errors
Error Sources:
- Site not found (neither InstaSite nor AgencyWebsite)
- Duda API authentication failure
- Site publish failure
- Plan upgrade failure
Validation:
if (!site) {
throw new Error('Site not found');
}
Site Processor Errors
Error Sources:
- Business/Contact not found
- Invalid phone number (< 4 digits)
- Missing email
- Template not found
- Duda API failures (create, upload, content, account, assign)
- Site publish failure
Critical Validations:
if (!business) throw new Error('Associated business not found');
if (phoneNumber.length < 4) throw new Error('Invalid phone number');
if (!business.email) throw new Error('Invalid email');
if (!template && !existingSite) throw new Error('Template/Site not found');
Retry Handling
All Processors:
const failedCb = async (job, err) => {
if (job.attemptsMade >= parseInt(process.env.SUBSCRIPTION_ACTIVATION_QUEUES_ATTEMPTS || '10')) {
// Mark as permanently failed
await QueueModel.findByIdAndUpdate(job.id, { failed: true });
// Log failure
await new StoreThirdPartyLogs({
account_id: item.account_id,
user_id: item.user_id,
status: 'failed',
type: '{processor}-subscription-activation',
additional_data: item.additional_data,
}).save();
}
};
Exponential Backoff:
- Base delay: 10 seconds
- Max attempts: 10
- Final wait: ~85 minutes total
Socket Notifications
Listing Activation Event
Event: listings_activated
await socketEmit('listings_activated', userIds, {
yext: { entity: yext_entity_id },
});
Recipients: All users in the account Payload: Yext entity ID
Instasite Activation Event
Event: instasite_status_changed
await socketEmit('instasite_status_changed', [user_id.toString()], {
id: updatedSite.id,
});
Recipients: Subscription owner Payload: InstaSite MongoDB ID
Testing Scenarios
1. Missing Listing Activation Detection
Setup:
const account = await Account.create({
name: 'Test Business',
address: { city: 'New York', country: 'US' },
// No yext.entity field
});
const subscription = await StoreSubscription.create({
stripe_id: 'sub_123',
status: 'active',
'plan.metadata.product_type': 'listings',
'metadata.account_id': account._id,
});
Expected:
- Aggregation pipeline detects missing item
- JWT token generated
- Queue entry created
- Listing processor activates Yext entity
- Account updated with entity ID
2. Existing Entity Reactivation
Setup:
const account = await Account.create({
yext: { entity: 'existing_entity_id' },
yext_status: false,
});
Expected:
- Skip entity creation
- Call activateEntity API with existing ID
- Update
yext_status = true - Clear pending/error flags
3. Instasite Publish
Setup:
const instasite = await InstasiteModel.create({
builder_id: 'site_123',
status: 'NOT_PUBLISHED',
});
const queue = await Queue.create({
queue_name: 'store_subscription_activate_instasite_queue',
data: {
instasite: instasite._id,
subscription_id: 'sub_456',
},
});
Expected:
- Duda publish API called
- InstaSite status updated
- Domain fields populated
- Socket event emitted
4. Site Creation from Template
Setup:
const business = await Contact.create({
account: account_id,
name: 'Test Corp',
email: 'test@example.com',
phone: '+12345678901',
image: { key: 'logo.png' },
});
const template = await Template.create({
builder_id: 'template_123',
});
Expected:
- Site created from template
- Logo uploaded
- Content customized with business data
- Duda account created and assigned
- Site saved to AgencyWebsite collection
- Site published
- Thumbnail generation triggered
5. Number Activation with Address
Setup:
const queue = await Queue.create({
queue_name: 'store_subscription_activate_numbers_queue',
data: {
number: '+19876543210',
token: jwtToken,
account_id: account_id,
},
});
Expected:
- Twilio addresses fetched
- Number updated with address SID
- Queue entry deleted
- Success log created
6. Retry Exhaustion
Setup:
// Mock Yext API to always fail
nock('https://api.yext.com')
.post('/v2/accounts/me/entities')
.times(10)
.reply(500, { error: 'Server error' });
Expected:
- 10 retry attempts with exponential backoff
- After 10th attempt: queue item marked
failed: true - Failure log created in StoreThirdPartyLogs
- Account.yext.error field set
7. JWT Token Expiration
Validation:
const decoded = jwt.verify(token, process.env.JWT_SECRET);
expect(decoded.exp - decoded.iat).toBe(10 * 24 * 60 * 60); // 10 days
expect(decoded.scope).toContain('users.me sites communications');
8. Concurrent Execution Prevention
Test:
// Start two concurrent cron executions
await Promise.all([subscriptionActivate(), subscriptionActivate()]);
// Verify only one processed items
const processedCount = await Queue.countDocuments({ in_progress: true });
expect(processedCount).toBeLessThanOrEqual(totalItems);
Performance Considerations
Aggregation Optimization
Index Requirements:
// StoreSubscription collection
{ status: 1, 'plan.metadata.product_type': 1 }
{ 'metadata.account_id': 1 }
// Account collection
{ '_id': 1, 'yext.entity': 1 }
Pipeline Performance:
- Single aggregation replaces multiple queries
- Indexes critical for $match and $lookup stages
- Result set typically small (< 100 items)
Queue Concurrency
Settings:
QueueWrapper(name, 'global', callbacks, false);
Concurrency: Default (1 job at a time per processor)
- Prevents race conditions on Account updates
- Safe for Yext API rate limits
- Ensures sequential Stripe metadata updates
Socket Emission
Pattern: Fire-and-forget with error logging
try {
await socketEmit(event, userIds, data);
} catch (err) {
logger.warn({ message: 'Socket emit failed', error: err });
// Continue execution - not critical
}
Stripe API Calls
Pattern: Update metadata after successful activation
try {
await stripe.subscriptions.update(subscription_id, { metadata });
} catch (err) {
logger.warn({ message: 'Stripe update failed', error: err });
return done(err); // Fail job to retry
}
Monitoring & Logging
Success Logging
Pattern: All processors log to StoreThirdPartyLogs
await new StoreThirdPartyLogs({
account_id: item.account_id,
user_id: item.user_id,
status: 'success',
type: '{processor}-subscription-activation',
additional_data: item.additional_data,
}).save();
Error Logging
Initiator Patterns:
QM/queues/store/subscriptions/activate/listingQM/queues/store/subscriptions/activate/instasiteQM/queues/store/subscriptions/activate/numbersQM/queues/store/subscriptions/activate/site
Error Details:
logger.error({
initiator: 'QM/queues/store/subscriptions/activate/{processor}',
message: errorMessage,
error: err,
job: job.id,
job_data: job.data,
});
Metrics to Monitor
- Queue Depth: Items in
Queuecollection by queue_name - Failure Rate: Count of
failed: trueitems - Processing Time: Job duration by processor type
- API Error Rates: Yext, Twilio, Duda API failures
- Retry Distribution: Count of attempts before success/failure
- Missing Items Count: Result size from aggregation pipeline
Alerting Scenarios
- High Failure Rate: > 10% of activations failing
- Yext Entity Creation Failures: API errors or invalid data
- Twilio Address Missing: Accounts without addresses
- Duda Site Publish Failures: Template or permission issues
- Queue Backlog: > 100 pending items
- Long Processing Times: Jobs taking > 5 minutes
Related Documentation
- Store Subscriptions Cancel - Cancellation with Yext/Duda unpublish
- Store Subscriptions Downgrade - Tier downgrades with resource cleanup
- Queue Wrapper - Bull queue configuration
- Common Billing Utilities - Shared billing functions
- External Yext Integration - Yext API integration
- External Duda Integration - Duda API integration
- External Twilio Integration - Twilio API integration
Summary
The Store Subscription Activation module provides critical automated provisioning for purchased services across four third-party platforms. Its intelligent missing items detection via MongoDB aggregation ensures no subscriptions are left unactivated, while specialized processors handle the unique requirements of Yext listings, Twilio phone numbers, and Duda sites. The system's JWT-based authentication, exponential backoff retry strategy, and comprehensive audit logging ensure reliable service delivery with full traceability.
Key Strengths:
- Proactive Detection: Aggregation pipeline finds missing activations automatically
- Multi-Platform Support: Handles 4 different third-party services seamlessly
- Robust Retry Logic: 10 attempts with exponential backoff handles transient failures
- Data Integrity: Updates internal DB, Stripe metadata, and emits real-time notifications
- Audit Trail: Complete success/failure tracking in StoreThirdPartyLogs
- Site Customization: Full business data integration for Duda sites
Critical for:
- Revenue activation (purchased services immediately available)
- Customer satisfaction (no manual intervention required)
- Third-party synchronization (internal subscriptions linked to external entities)
- Support efficiency (auto-detection reduces tickets)