Conversation
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Pull request overview
Introduces a new Temporal worker to export PCC project hierarchy data from Snowflake to S3 (Parquet) and sync it into CDP (segments + insightsProjects), while refactoring shared Snowflake/S3/metadata components into @crowd/snowflake and updating the existing snowflake_connectors app to consume them.
Changes:
- Add
pcc_sync_workerapp with Temporal schedules/workflows, export/cleanup activities, Parquet parsing, and a DB-sync consumer. - Move/centralize Snowflake export job metadata + S3/Parquet consumption logic into
services/libs/snowflakeand update snowflake_connectors to use it. - Add DB migration for PCC sync support (segments.maturity +
pcc_projects_sync_errorstable + dedup index), plus worker Docker/compose setup.
Reviewed changes
Copilot reviewed 33 out of 34 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/snowflake/src/snowflakeExporter.ts | Fix internal import to avoid self-package import/cycles. |
| services/libs/snowflake/src/s3Service.ts | New S3 download/delete + Parquet row iteration utility. |
| services/libs/snowflake/src/metadataStore.ts | Add platform filtering + named params for export job bookkeeping. |
| services/libs/snowflake/src/index.ts | Export new Snowflake lib surface (metadata store, S3 service, exporter). |
| services/libs/snowflake/package.json | Add S3 + Parquet deps and DB dependency for the library. |
| services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts | Use @crowd/snowflake MetadataStore/S3Service; add enabled-platform filtering. |
| services/apps/snowflake_connectors/src/activities/exportActivity.ts | Switch imports to @crowd/snowflake. |
| services/apps/snowflake_connectors/src/activities/cleanupActivity.ts | Use shared MetadataStore/S3Service and pass enabled platforms to cleanup. |
| services/apps/snowflake_connectors/package.json | Remove direct S3/Parquet deps (now come from @crowd/snowflake). |
| services/apps/pcc_sync_worker/tsconfig.json | New TS config for PCC worker app. |
| services/apps/pcc_sync_worker/src/workflows/index.ts | Workflow exports. |
| services/apps/pcc_sync_worker/src/workflows/exportWorkflow.ts | Temporal workflow to run PCC export activity. |
| services/apps/pcc_sync_worker/src/workflows/cleanupWorkflow.ts | Temporal workflow to run PCC cleanup activity. |
| services/apps/pcc_sync_worker/src/scripts/triggerExport.ts | Manual script to start export workflow. |
| services/apps/pcc_sync_worker/src/scripts/triggerCleanup.ts | Manual script to start cleanup workflow. |
| services/apps/pcc_sync_worker/src/schedules/pccS3Export.ts | Temporal schedule registration for daily PCC export. |
| services/apps/pcc_sync_worker/src/schedules/pccS3Cleanup.ts | Temporal schedule registration for daily PCC cleanup. |
| services/apps/pcc_sync_worker/src/schedules/index.ts | Schedule exports. |
| services/apps/pcc_sync_worker/src/parser/types.ts | Parquet-row + parsed-project types. |
| services/apps/pcc_sync_worker/src/parser/rowParser.ts | Pure PCC row parsing + hierarchy mapping rules. |
| services/apps/pcc_sync_worker/src/parser/index.ts | Parser exports. |
| services/apps/pcc_sync_worker/src/main.ts | ServiceWorker archetype configuration. |
| services/apps/pcc_sync_worker/src/index.ts | Worker entrypoint: init + schedule + start consumer + start Temporal worker. |
| services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts | PCC job polling + Parquet processing + DB sync + error recording. |
| services/apps/pcc_sync_worker/src/config/settings.ts | Re-export Temporal config helpers. |
| services/apps/pcc_sync_worker/src/activities/index.ts | Activity exports. |
| services/apps/pcc_sync_worker/src/activities/exportActivity.ts | Snowflake recursive CTE export into S3 + metadata insert. |
| services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts | Cleanup exported S3 files + mark jobs cleaned + Slack alerting on failures. |
| services/apps/pcc_sync_worker/package.json | PCC worker package manifest + scripts. |
| scripts/services/pcc-sync-worker.yaml | Compose service definitions for PCC worker (prod/dev). |
| scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore | Docker ignore file for PCC worker build context. |
| scripts/services/docker/Dockerfile.pcc_sync_worker | Multi-stage build for PCC worker. |
| backend/src/database/migrations/V1775312770__pcc-sync-worker-setup.sql | Add segments.maturity + PCC sync errors table + dedup index. |
| backend/src/database/migrations/U1775312770__pcc-sync-worker-setup.sql | Rollback for PCC sync DB changes. |
Comments suppressed due to low confidence (2)
services/libs/snowflake/src/metadataStore.ts:77
- When
platformsis provided as an empty array, this method falls back to no filter and will claim jobs for all platforms. That’s risky ifCROWD_SNOWFLAKE_ENABLED_PLATFORMSis accidentally empty/misconfigured. Consider treating an explicit emptyplatformslist as “match nothing” (return null early, or inject anAND FALSEfilter).
services/libs/snowflake/src/metadataStore.ts:125 platformsbeing an empty array currently results in no platform filter, so cleanup can target jobs for all platforms if the enabled-platforms list is empty/misconfigured. Consider returning[]early whenplatformsis provided but empty (or otherwise ensuring the filter matches nothing).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts
Outdated
Show resolved
Hide resolved
services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts
Outdated
Show resolved
Hide resolved
Signed-off-by: Uroš Marolt <uros@marolt.me>
services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts
Outdated
Show resolved
Hide resolved
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit e1c45e7. Configure here.

Note
Medium Risk
Adds a new worker that exports from Snowflake, consumes Parquet from S3, and upserts core
segments/insightsProjectsdata plus new error-tracking tables; failures or mapping mistakes could impact production data and job processing.Overview
Introduces a new
pcc_sync_workerservice that schedules daily PCC Snowflake exports and S3 cleanup via Temporal, and runs a polling consumer that streams exported Parquet rows and syncs matching CDP records.The consumer adds a PCC-specific parsing/mapping layer (depth-based hierarchy rules and status/maturity mapping), performs segment matching with mismatch/slug-drift detection, and upserts
segments(including newmaturity) and relatedinsightsProjects, recording non-auto-resolvable issues into a newpcc_projects_sync_errorstable with deduping indexes.Refactors Snowflake connectors to consume shared Snowflake library primitives (
MetadataStore,S3Service,SnowflakeExporter), extends the metadata store APIs to filter jobs/cleanup by platform(s), and moves S3/parquet + AWS SDK dependencies into@crowd/snowflake.Reviewed by Cursor Bugbot for commit e1c45e7. Bugbot is set up for automated code reviews on this repo. Configure here.