Pipelines
π Overviewβ
internal/api/v1/crm/services/pipeline.service.js orchestrates pipeline creation, cloning, mass edits, deal associations, and analytics. It powers the CRM pipeline list, leaderboard widgets, stage conversion reports, and automation log explorer while enforcing account ownership and visibility rules. Controllers forward validated requests; this service executes all aggregations, conversions, and data hygiene checks.
ποΈ Collections Usedβ
π Full Schema: See Database Collections Documentation
crm.pipelineβ
- Operations: Read/write, aggregations, stage lookups, cloning
- Model:
shared/models/pipeline.js - Usage Context: Core pipeline definitions, helper methods for stages and deal mutations
crm.pipeline.stagesβ
- Operations: Aggregated within pipeline list and reports
- Model:
shared/models/pipeline-stage.js - Usage Context: Stage ordering, automation metadata, conversion stats
crm.dealsβ
- Operations: Aggregations for totals, leaderboards, and stage metrics
- Model:
shared/models/deal.js - Usage Context: Supplies counts, values, and filtering for reports and association helpers
crm.automationsβ
- Operations: Read during clone/update flows and automation log queries
- Model:
shared/models/automation.js - Usage Context: Ensures automation metadata is replicated and filtered correctly
crm.automation.logsβ
- Operations: Read for log endpoints, filtered by pipeline/deal/stage
- Model:
shared/models/automation-log.js - Usage Context: UI history for automation execution, retries, and errors
crm.tags, crm.contacts, usersβ
- Operations: Joins for leaderboard filters, association helpers, and owner metadata
- Models:
shared/models/crm-tag.js,shared/models/contact.js,shared/models/user.js - Usage Context: Provide owner identities, available tags, and contact context when associating deals
π Data Flowβ
flowchart TD
A[Request] --> B{Parse pagination + filters}
B --> C[generateFilterObj + deal query]
C --> D[Pipeline.aggregate -> list pipelines]
D -->|stages required| E[Lookup stages + deal stats]
E --> F[CurrencyUtil.convert totals]
F --> G[cleanRecord outputs]
G --> H[Response]
subgraph Mutations
I[Payload (create/update/copy)] --> J[Validate account ownership]
J --> K[pipeline helpers (add/update/remove stages)]
K --> L[cleanRecord + automation hydration]
L --> H
end
subgraph Reporting
M[Report request] --> N[Build aggregation pipeline]
N --> O[Deal.aggregate with grouping]
O --> P[Currency + timezone adjustments]
P --> H
end
subgraph Automation Logs
Q[Log request] --> R[Find pipeline + scope filters]
R --> S[AutomationLogs.aggregate/find]
S --> T[Sanitise payload]
T --> H
end
π§ Business Logic & Functionsβ
Listing & Detailsβ
getPipeline({ page, limit, q, dealFilter, dealQuery, uid, account_id, is_owner, currencyCode, pipeline_id, stages })β
- Aggregates pipelines with optional search, filters embedded stage docs when requested, and computes per-stage deal stats while respecting owner visibility rules.
- Returns pipelines plus account CSV metadata and pagination info.
getOnePipeline({ id, account_id, uid, is_owner, dealFilter, dealQuery, currencyCode })β
- Fetches a single pipeline, loads stages with automations, and expands deals using the same filter DSL as list endpoints.
- Provides totals, conversion-friendly currency data, and visibility-safe attachments.
Creation & Maintenanceβ
postPipeline({ account_id, uid, body })β
- Validates stage payloads then uses
Pipeline.createPipelinehelpers to persist pipelines with initial stages and automations.
copyPipeline({ id, account_id, uid })β
- Clones a pipelineβincluding stages, automations, and orderingβscoped to the same account.
putPipeline({ account_id, uid, body })β
- Bulk mutation entrypoint that can update, add, or archive pipelines in one request; delegates to pipeline model helpers per operation.
updateOnePipeline({ id, account_id, uid, body })β
- Applies targeted updates (rename, reorder, adjust automations) while rehydrating pipeline data with
cleanRecord.
deleteOnePipeline({ id, account_id, uid })β
- Validates ownership, cleans associated deals via
deleteDealAdditionalData, deletes pipeline and stage records, and returnsSUCCESS.
Associations & Tagsβ
putAssociatedPipeline({ account_id, uid, id, body, dealFilter, dealQuery, is_owner })β
- Bulk moves deals between stages/pipelines based on filter payloads, validating visibility and preventing owner conflicts.
putAssociateTagPipeline({ account_id, uid, body, dealFilter, dealQuery, is_owner })β
- Adds or removes tags from filtered deal sets, ensuring duplicates are avoided and locked tags remain untouched.
Reporting & Analyticsβ
reportingLeaderboardPipeline({ account_id, uid, start, end, order, limit, filters, tz, currencyCode })β
- Builds leaderboards by owner, summing deal values within a date window and converting totals using
CurrencyUtil.
reportingStagePipeline({ account_id, uid, start, end, filters, tz, currencyCode })β
- Aggregates deal counts and conversion timing per stage, supporting stage/user filtering and timezone-aware grouping.
reportingListPipeline({ account_id, uid, start, end, filters, tz, currencyCode })β
- Produces pipeline-level summaries (win rate, velocity, totals) for dashboards.
totalNewPipeline({ account_id, uid, start, end, tz })β
- Returns creation counts segmented by period with timezone adjustments.
Automation Logsβ
automationLogsPipeline({ account_id, uid, filters, page, limit, tz })β
- Lists automation execution logs with pagination, filterable by pipeline, deal, stage, and status.
automationLogsByDealPipeline({ account_id, uid, dealID, limit, page, tz })β
- Narrows logs to a single deal, preserving chronological order and timezone formatting.
automationLogByIDPipeline({ account_id, uid, dealID, logID })β
- Fetches a single log record, including error stacks and retry metadata, after validating pipeline membership.
π Integration Pointsβ
Internal Dependenciesβ
generateFilterObjanddealUtils.checkCsvfor shared filter DSL and CSV schema contextpipelineUtils.getUserTimezone,cleanRecordfor timezone adjustments and sanitized payloadsdeleteDealAdditionalDatafor cascading clean-up during pipeline deletionCurrencyUtilfor consistent totals across reporting and list endpointsmoment-timezonefor timezone-aware leaderboards
External Servicesβ
- None directly; downstream queueing is handled by deal service operations invoked separately.
π§ͺ Edge Cases & Special Handlingβ
- Owner visibility: Non-owners get pipelines filtered to deals they own/follow when visibility is restricted.
- Stage hydration: Stages and automations are only loaded when requested to keep list responses fast.
- Locked pipelines: System pipelines can be protected; attempts to modify return explicit
ACCESS_DENIEDerrors. - Automation logs: Filters require valid ObjectIds; service returns
RESOURCE_NOT_FOUNDwhen pipeline/deal mismatch occurs.
β οΈ Important Notesβ
- π Keep indexes in sync: Heavy aggregations depend on
idx_account_order_createdand stage indexes; missing indexes can degrade performance drastically. - π° Set
BASE_CURRENCYto avoid conversion failures across analytics endpoints. - π§± Bulk mutations mutate multiple pipelinesβwrap calls in transactions if triggered outside the standard controller flow.