Import
📖 Overview
internal/api/v1/crm/services/import.service.js brokers bulk CRM uploads initiated from dashboards. It validates request context, enriches auth metadata, guards mappings, and then either publishes a message onto Google Pub/Sub for SaaS imports (HubSpot, Keap, Salesforce, etc.) or immediately queues CSV payloads for contacts/deals. Downstream workers consume the messages to write data, so this service focuses on validation, routing, and pagination of historical results.
🗄️ Collections Used
📚 Full Schema: See Database Collections Documentation
crm.contact-import-results
- Operations: Paginated read of prior import runs, filtering by owner vs user ID
- Model:
shared/models/contact-import-results.js - Usage Context: Stores status, errors, and summary metrics for completed jobs
queues
- Operations: Inserts new
Queuedocuments for CSV imports (contacts/deals) - Model:
shared/models/queues.js - Usage Context: Defer heavy import processing to background workers
users
- Operations: Fetches sender email/name before kicking off jobs
- Model:
shared/models/user.js - Usage Context: Pub/Sub payloads include human-readable metadata for notifications
🔄 Data Flow
flowchart TD
Client -->|POST /import| Controller
Controller --> ImportService
ImportService -->|fetch user| Users[(users)]
ImportService -->|validate mappings| Guard{Provider?}
Guard -->|SaaS (HubSpot, etc.)| PubSub[(Google Pub/Sub Topics)]
Guard -->|CSV contacts| QueueContacts[(queues)]
Guard -->|CSV deals| QueueDeals[(queues)]
PubSub --> WorkerSaaS[Integration Workers]
QueueContacts --> WorkerContacts[Contacts Import Worker]
QueueDeals --> WorkerDeals[Deals Import Worker]
WorkerSaaS --> Results[(crm.contact-import-results)]
WorkerContacts --> Results
WorkerDeals --> Results
Results -->|GET /import| ImportService
ImportService --> Response
🧭 Service Entry Points
Creation
postImport({ body, auth, res })- Validates provider, type, and presence of mappings/data
- Loads requester profile to embed
email/nameinto the payload - For SaaS providers, publishes JSON to dedicated Pub/Sub topics (
v2.csv-import-*) - For CSV uploads, calls
importContactorimportDealsto stage queue documents (including pipeline/tags metadata) - Supports a
body.errorflag to simulate failure responses for UI testing
Listing & Detail
-
getImport({ userId, type, contactType, page, is_owner, account_id })- Enforces
typeselection (contactsordeals) and derives filters accordingly - Uses account scope when requester is an owner; otherwise restricts to the user who launched the job
- Returns paginated results (limit 10) with next/previous pointers and total counts
- Enforces
-
getImportById(id)- Simple passthrough to
ContactResults.findById - Used by UI detail views to fetch CSV error manifests and statistics
- Simple passthrough to
Internal Helpers
-
importContact(message_object)- Validates CSV column names (rejects dots to guard Mongo path injection)
- Writes Queue documents with mappings, raw CSV payload, and optional tags
-
importDeals(message_object)- Similar to contact import but attaches
pipeline_idand deal tags toadditional_data
- Similar to contact import but attaches
🔗 Integration Points
Internal
catch-errors.badRequest/notAllowedunify error semantics for invalid filters or CSV structureswasabiUtilis loaded for parity with legacy flows though current service paths rely on queues/workers for file IO
External
- Google Pub/Sub topics per provider fan-out third-party sync jobs
- Queue Workers consume saved CSV payloads to apply inserts into CRM collections
⚠️ Edge Cases & Guardrails
- User metadata: Missing
emailornameon the initiating user aborts the import (Invalid User) - Mapping validation: Column headers containing
.are rejected to prevent nested write exploits - Owner visibility: Owners see account-wide results; others only see their own history
- CSV deals: Require
pipelineId; contacts accepttagsfor all/person/business runs - Simulation flag:
body.errorshort-circuits with a deterministic error payload for QA