Skip to main content

Import

📖 Overview

internal/api/v1/crm/services/import.service.js brokers bulk CRM uploads initiated from dashboards. It validates request context, enriches auth metadata, guards mappings, and then either publishes a message onto Google Pub/Sub for SaaS imports (HubSpot, Keap, Salesforce, etc.) or immediately queues CSV payloads for contacts/deals. Downstream workers consume the messages to write data, so this service focuses on validation, routing, and pagination of historical results.

🗄️ Collections Used

📚 Full Schema: See Database Collections Documentation

crm.contact-import-results

  • Operations: Paginated read of prior import runs, filtering by owner vs user ID
  • Model: shared/models/contact-import-results.js
  • Usage Context: Stores status, errors, and summary metrics for completed jobs

queues

  • Operations: Inserts new Queue documents for CSV imports (contacts/deals)
  • Model: shared/models/queues.js
  • Usage Context: Defer heavy import processing to background workers

users

  • Operations: Fetches sender email/name before kicking off jobs
  • Model: shared/models/user.js
  • Usage Context: Pub/Sub payloads include human-readable metadata for notifications

🔄 Data Flow

flowchart TD
Client -->|POST /import| Controller
Controller --> ImportService
ImportService -->|fetch user| Users[(users)]
ImportService -->|validate mappings| Guard{Provider?}
Guard -->|SaaS (HubSpot, etc.)| PubSub[(Google Pub/Sub Topics)]
Guard -->|CSV contacts| QueueContacts[(queues)]
Guard -->|CSV deals| QueueDeals[(queues)]
PubSub --> WorkerSaaS[Integration Workers]
QueueContacts --> WorkerContacts[Contacts Import Worker]
QueueDeals --> WorkerDeals[Deals Import Worker]
WorkerSaaS --> Results[(crm.contact-import-results)]
WorkerContacts --> Results
WorkerDeals --> Results
Results -->|GET /import| ImportService
ImportService --> Response

🧭 Service Entry Points

Creation

  • postImport({ body, auth, res })
    • Validates provider, type, and presence of mappings/data
    • Loads requester profile to embed email/name into the payload
    • For SaaS providers, publishes JSON to dedicated Pub/Sub topics (v2.csv-import-*)
    • For CSV uploads, calls importContact or importDeals to stage queue documents (including pipeline/tags metadata)
    • Supports a body.error flag to simulate failure responses for UI testing

Listing & Detail

  • getImport({ userId, type, contactType, page, is_owner, account_id })

    • Enforces type selection (contacts or deals) and derives filters accordingly
    • Uses account scope when requester is an owner; otherwise restricts to the user who launched the job
    • Returns paginated results (limit 10) with next/previous pointers and total counts
  • getImportById(id)

    • Simple passthrough to ContactResults.findById
    • Used by UI detail views to fetch CSV error manifests and statistics

Internal Helpers

  • importContact(message_object)

    • Validates CSV column names (rejects dots to guard Mongo path injection)
    • Writes Queue documents with mappings, raw CSV payload, and optional tags
  • importDeals(message_object)

    • Similar to contact import but attaches pipeline_id and deal tags to additional_data

🔗 Integration Points

Internal

  • catch-errors.badRequest/notAllowed unify error semantics for invalid filters or CSV structures
  • wasabiUtil is loaded for parity with legacy flows though current service paths rely on queues/workers for file IO

External

  • Google Pub/Sub topics per provider fan-out third-party sync jobs
  • Queue Workers consume saved CSV payloads to apply inserts into CRM collections

⚠️ Edge Cases & Guardrails

  • User metadata: Missing email or name on the initiating user aborts the import (Invalid User)
  • Mapping validation: Column headers containing . are rejected to prevent nested write exploits
  • Owner visibility: Owners see account-wide results; others only see their own history
  • CSV deals: Require pipelineId; contacts accept tags for all/person/business runs
  • Simulation flag: body.error short-circuits with a deterministic error payload for QA
💬

Documentation Assistant

Ask me anything about the docs

Hi! I'm your documentation assistant. Ask me anything about the docs!

I can help you with:
- Code examples
- Configuration details
- Troubleshooting
- Best practices

Try asking: How do I configure the API?
09:31 AM