Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a43be14
chore: pcc sync worker
themarolt Apr 6, 2026
d71ee96
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 7, 2026
d616b51
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 9, 2026
ad1fb1b
chore: pcc sync worker wip1
themarolt Apr 14, 2026
c462cc9
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 14, 2026
b7bc7a5
fix: comments
themarolt Apr 14, 2026
3d012f5
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 14, 2026
d9c3a75
fix: lint
themarolt Apr 14, 2026
e1c45e7
fix: lint
themarolt Apr 14, 2026
34463ab
fix: mouads comments
themarolt Apr 15, 2026
31867f4
fix: joanas comments
themarolt Apr 15, 2026
f5c90fb
fix: comment
themarolt Apr 15, 2026
8b6800a
fix: bugfixes
themarolt Apr 15, 2026
c0a80b0
fix: bugfixes
themarolt Apr 16, 2026
9af4f28
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 16, 2026
2c9ab8d
fix: bugfixes
themarolt Apr 16, 2026
08be688
fix: guard against empty levelRows after HIERARCHY_LEVEL filter
themarolt Apr 16, 2026
b54fa85
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 17, 2026
33ad92e
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 20, 2026
a26acf6
fix: comments
themarolt Apr 20, 2026
68d71c7
fix: address copilot and cursor review feedback
themarolt Apr 20, 2026
d9037e5
fix: address copilot shutdown and diagnostics feedback
themarolt Apr 20, 2026
dce54ad
Merge branch 'main' into feat/pcc-sync-CM-1086-CM-1087-CM-1088-CM-1089
themarolt Apr 20, 2026
286765c
fix: comments
themarolt Apr 20, 2026
f3fd2b2
fix: trim everything
themarolt Apr 20, 2026
c6b6350
fix: query
themarolt Apr 20, 2026
15bb77c
fix: comments
themarolt Apr 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS pcc_sync_errors_dedup_idx;

DROP TABLE IF EXISTS pcc_projects_sync_errors;

ALTER TABLE segments DROP COLUMN IF EXISTS maturity;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Add maturity field to segments for PCC project_maturity_level sync
ALTER TABLE segments ADD COLUMN IF NOT EXISTS maturity TEXT NULL;

-- Catch-all table for PCC sync issues that require manual review
CREATE TABLE IF NOT EXISTS pcc_projects_sync_errors (
id BIGSERIAL PRIMARY KEY,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
external_project_id TEXT,
external_project_slug TEXT,
error_type TEXT NOT NULL,
details JSONB,
resolved BOOLEAN NOT NULL DEFAULT FALSE
);

-- Deduplication index: one unresolved error per (project, error_type).
-- On repeated daily exports the same error upserts in place instead of accumulating rows.
-- Excludes rows where external_project_id IS NULL (e.g. SCHEMA_MISMATCH with no project id).
CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_idx
ON pcc_projects_sync_errors (external_project_id, error_type)
WHERE NOT resolved AND external_project_id IS NOT NULL;

-- Deduplication index for unidentifiable rows (no external_project_id).
-- Keyed on (error_type, reason) so repeated daily exports don't accumulate duplicate rows
-- for the same class of malformed input (e.g. rows missing PROJECT_ID/NAME/DEPTH).
CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_unknown_idx
ON pcc_projects_sync_errors (error_type, (details->>'reason'))
WHERE NOT resolved AND external_project_id IS NULL;
58 changes: 52 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM node:20-bullseye-slim AS builder

RUN apt-get update && apt-get install -y python3 make g++ && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare [email protected] --activate

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
RUN pnpm fetch

COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:20-bullseye-slim AS runner

RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare [email protected] --activate

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
COPY --from=builder /usr/crowd/app/services/apps/pcc_sync_worker/ ./services/apps/pcc_sync_worker
18 changes: 18 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
53 changes: 53 additions & 0 deletions scripts/services/pcc-sync-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: pcc-sync-worker
CROWD_TEMPORAL_TASKQUEUE: pccSync
SHELL: /bin/sh

services:
pcc-sync-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run start'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

pcc-sync-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run dev'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: pcc-sync-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/apps/pcc_sync_worker/src:/usr/crowd/app/services/apps/pcc_sync_worker/src

networks:
crowd-bridge:
external: true
35 changes: 35 additions & 0 deletions services/apps/pcc_sync_worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "@crowd/pcc-sync-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker tsx src/index.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
"tsc-check": "tsc --noEmit",
"trigger-export": "SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts",
"trigger-export:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts",
"trigger-cleanup": "SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts",
"trigger-cleanup:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts"
},
"dependencies": {
"@crowd/archetype-standard": "workspace:*",
"@crowd/archetype-worker": "workspace:*",
"@crowd/common": "workspace:*",
"@crowd/database": "workspace:*",
"@crowd/logging": "workspace:*",
"@crowd/slack": "workspace:*",
"@crowd/snowflake": "workspace:*",
"@crowd/temporal": "workspace:*",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
"devDependencies": {
"nodemon": "^3.0.1"
}
}
33 changes: 33 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
import { MetadataStore, S3Service } from '@crowd/snowflake'

const log = getServiceChildLogger('cleanupActivity')

const PLATFORM = 'pcc'

export async function executeCleanup(intervalHours = 24): Promise<void> {
Comment thread
joanagmaia marked this conversation as resolved.
const db = await getDbConnection(WRITE_DB_CONFIG())
const metadataStore = new MetadataStore(db)
const s3Service = new S3Service()

const jobs = await metadataStore.getCleanableJobS3Paths(intervalHours, PLATFORM, false)
log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable PCC jobs')

for (const job of jobs) {
try {
await s3Service.deleteFile(job.s3Path)
await metadataStore.markCleaned(job.id)
log.info({ jobId: job.id, s3Path: job.s3Path }, 'Cleaned PCC job')
} catch (err) {
log.error({ jobId: job.id, s3Path: job.s3Path, err }, 'Failed to clean PCC job, skipping')
sendSlackNotification(
SlackChannel.CDP_INTEGRATIONS_ALERTS,
SlackPersona.ERROR_REPORTER,
'PCC S3 Cleanup Failed',
`Failed to clean job \`${job.id}\` at \`${job.s3Path}\`.\n\n*Error:* ${err instanceof Error ? err.message : err}`,
)
Comment thread
themarolt marked this conversation as resolved.
Comment thread
themarolt marked this conversation as resolved.
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
100 changes: 100 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/exportActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Export activity: Execute PCC recursive CTE COPY INTO + write metadata.
*
* Full daily export of ANALYTICS.SILVER_DIM.PROJECTS via recursive CTE.
* No incremental logic — at ~1,538 leaf rows, a full daily export is simpler
* and more reliable than incremental (a parent name change would require
* re-exporting all descendants).
*/
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { MetadataStore, SnowflakeExporter } from '@crowd/snowflake'

const log = getServiceChildLogger('exportActivity')

const PLATFORM = 'pcc'
const SOURCE_NAME = 'project-hierarchy'

function buildSourceQuery(): string {
Comment thread
themarolt marked this conversation as resolved.
return `
WITH RECURSIVE project_hierarchy AS (
SELECT project_id, name, description, project_logo, project_status,
project_maturity_level, repository_url, slug, parent_id,
1 AS depth,
name AS depth_1, NULL::VARCHAR AS depth_2, NULL::VARCHAR AS depth_3,
NULL::VARCHAR AS depth_4, NULL::VARCHAR AS depth_5
FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NULL
UNION ALL
SELECT p.project_id, p.name, p.description, p.project_logo, p.project_status,
p.project_maturity_level, p.repository_url, p.slug, p.parent_id,
h.depth + 1,
h.depth_1,
CASE WHEN h.depth + 1 = 2 THEN p.name ELSE h.depth_2 END,
CASE WHEN h.depth + 1 = 3 THEN p.name ELSE h.depth_3 END,
CASE WHEN h.depth + 1 = 4 THEN p.name ELSE h.depth_4 END,
CASE WHEN h.depth + 1 = 5 THEN p.name ELSE h.depth_5 END
FROM ANALYTICS.SILVER_DIM.PROJECTS p
INNER JOIN project_hierarchy h ON p.parent_id = h.project_id
)
SELECT ph.project_id, ph.name, ph.slug, ph.description, ph.project_logo, ph.repository_url,
ph.project_status, ph.project_maturity_level, ph.depth,
ph.depth_1, ph.depth_2, ph.depth_3, ph.depth_4, ph.depth_5,
s.segment_id
FROM project_hierarchy ph
LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s
ON s.source_id = ph.project_id AND s.project_type = 'subproject'
WHERE ph.project_id NOT IN (
SELECT DISTINCT parent_id FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NOT NULL
)
`
}

function buildS3FilenamePrefix(): string {
const now = new Date()
const year = now.getFullYear()
const month = String(now.getMonth() + 1).padStart(2, '0')
const day = String(now.getDate()).padStart(2, '0')
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH
if (!s3BucketPath) {
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
}
return `${s3BucketPath}/${PLATFORM}/${SOURCE_NAME}/${year}/${month}/${day}`
}

export async function executeExport(): Promise<void> {
log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'Starting PCC export')

const exporter = new SnowflakeExporter()
const db = await getDbConnection(WRITE_DB_CONFIG())

try {
const metadataStore = new MetadataStore(db)
const sourceQuery = buildSourceQuery()
const s3FilenamePrefix = buildS3FilenamePrefix()
const exportStartedAt = new Date()

const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => {
await metadataStore.insertExportJob(
PLATFORM,
SOURCE_NAME,
s3Path,
totalRows,
totalBytes,
exportStartedAt,
)
}

await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete)

log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'PCC export completed')
} catch (err) {
log.error({ platform: PLATFORM, sourceName: SOURCE_NAME, err }, 'PCC export failed')
throw err
} finally {
await exporter
.destroy()
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
}
}
Comment thread
themarolt marked this conversation as resolved.
Outdated
2 changes: 2 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { executeExport } from './exportActivity'
export { executeCleanup } from './cleanupActivity'
Loading
Loading