Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS pcc_sync_errors_dedup_idx;

DROP TABLE IF EXISTS pcc_projects_sync_errors;

ALTER TABLE segments DROP COLUMN IF EXISTS maturity;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- Add maturity field to segments for PCC project_maturity_level sync
ALTER TABLE segments ADD COLUMN IF NOT EXISTS maturity TEXT NULL;

-- Catch-all table for PCC sync issues that require manual review
CREATE TABLE IF NOT EXISTS pcc_projects_sync_errors (
id BIGSERIAL PRIMARY KEY,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
external_project_id TEXT,
external_project_slug TEXT,
error_type TEXT NOT NULL,
details JSONB,
resolved BOOLEAN NOT NULL DEFAULT FALSE
);

-- Deduplication index: one unresolved error per (project, error_type).
-- On repeated daily exports the same error upserts in place instead of accumulating rows.
-- Excludes rows where external_project_id IS NULL (e.g. SCHEMA_MISMATCH with no project id).
CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_idx
ON pcc_projects_sync_errors (external_project_id, error_type)
WHERE NOT resolved AND external_project_id IS NOT NULL;

-- Deduplication index for unidentifiable rows (no external_project_id).
-- Keyed on (error_type, reason) so repeated daily exports don't accumulate duplicate rows
-- for the same class of malformed input (e.g. rows missing PROJECT_ID/NAME/DEPTH).
CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_unknown_idx
ON pcc_projects_sync_errors (error_type, (details->>'reason'))
WHERE NOT resolved AND external_project_id IS NULL;
58 changes: 52 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM node:20-bullseye-slim AS builder

RUN apt-get update && apt-get install -y python3 make g++ && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
RUN pnpm fetch

COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:20-bullseye-slim AS runner

RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
COPY --from=builder /usr/crowd/app/services/apps/pcc_sync_worker/ ./services/apps/pcc_sync_worker
18 changes: 18 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
53 changes: 53 additions & 0 deletions scripts/services/pcc-sync-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: pcc-sync-worker
CROWD_TEMPORAL_TASKQUEUE: pccSync
SHELL: /bin/sh

services:
pcc-sync-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run start'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

pcc-sync-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run dev'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: pcc-sync-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/apps/pcc_sync_worker/src:/usr/crowd/app/services/apps/pcc_sync_worker/src

networks:
crowd-bridge:
external: true
35 changes: 35 additions & 0 deletions services/apps/pcc_sync_worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "@crowd/pcc-sync-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker tsx src/index.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
"tsc-check": "tsc --noEmit",
"trigger-export": "SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts",
"trigger-export:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts",
"trigger-cleanup": "SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts",
"trigger-cleanup:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts"
},
"dependencies": {
"@crowd/archetype-standard": "workspace:*",
"@crowd/archetype-worker": "workspace:*",
"@crowd/common": "workspace:*",
"@crowd/database": "workspace:*",
"@crowd/logging": "workspace:*",
"@crowd/slack": "workspace:*",
"@crowd/snowflake": "workspace:*",
"@crowd/temporal": "workspace:*",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
"devDependencies": {
"nodemon": "^3.0.1"
}
}
33 changes: 33 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
import { MetadataStore, S3Service } from '@crowd/snowflake'

const log = getServiceChildLogger('cleanupActivity')

const PLATFORM = 'pcc'

export async function executeCleanup(intervalHours = 24): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eric suggested this https://docs.google.com/document/d/1t6HyZdHGM9TA47fyJ5jRQ96z4O5esmnCDu03yNS4lwI/edit?disco=AAAB2hXY2p8, to basically explore the S3 bucket policies for automatically purging old snapshots / query-results. Perhaps leveraging that we could avoid having our own internal job to clean up S3 buckets.
Let me know if it's something worth checking or if it would still be preferable to have this cleanup job.

const db = await getDbConnection(WRITE_DB_CONFIG())
const metadataStore = new MetadataStore(db)
const s3Service = new S3Service()

const jobs = await metadataStore.getCleanableJobS3Paths(intervalHours, PLATFORM, false)
log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable PCC jobs')

for (const job of jobs) {
try {
await s3Service.deleteFile(job.s3Path)
await metadataStore.markCleaned(job.id)
log.info({ jobId: job.id, s3Path: job.s3Path }, 'Cleaned PCC job')
} catch (err) {
log.error({ jobId: job.id, s3Path: job.s3Path, err }, 'Failed to clean PCC job, skipping')
sendSlackNotification(
SlackChannel.CDP_INTEGRATIONS_ALERTS,
SlackPersona.ERROR_REPORTER,
'PCC S3 Cleanup Failed',
`Failed to clean job \`${job.id}\` at \`${job.s3Path}\`.\n\n*Error:* ${err instanceof Error ? err.message : err}`,
)
}
}
}
100 changes: 100 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/exportActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Export activity: Execute PCC recursive CTE COPY INTO + write metadata.
*
* Full daily export of ANALYTICS.SILVER_DIM.PROJECTS via recursive CTE.
* No incremental logic — at ~1,538 leaf rows, a full daily export is simpler
* and more reliable than incremental (a parent name change would require
* re-exporting all descendants).
*/
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { MetadataStore, SnowflakeExporter } from '@crowd/snowflake'

const log = getServiceChildLogger('exportActivity')

const PLATFORM = 'pcc'
const SOURCE_NAME = 'project-hierarchy'

function buildSourceQuery(): string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I know this is the query I used before on the first data analysis, but this is not following the latest mapping rules of the new PROJECTS_SPINE table which already has the hierarchy defined -> We don't need to do it ourselves.

Proposed new query:

SELECT
    p.project_id,
    p.name,
    p.description,
    p.project_logo,
    p.project_status,
    p.project_maturity_level,
    ps.mapped_project_id,
    ps.mapped_project_name,
    ps.mapped_project_slug,
    ps.hierarchy_level,
    s.segment_id
FROM ANALYTICS.SILVER_DIM.PROJECTS p
LEFT JOIN ANALYTICS.SILVER_DIM.PROJECT_SPINE ps ON ps.base_project_id = p.project_id
LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s
    ON s.source_id = p.project_id
    AND s.project_type = 'subproject'
WHERE p.project_id NOT IN (
    SELECT DISTINCT parent_id
    FROM ANALYTICS.SILVER_DIM.PROJECTS
    WHERE parent_id IS NOT NULL
)
ORDER BY p.name, ps.hierarchy_level ASC;

Important
Level/depth is now longer in a column but rather in new rows. E.g.


project_id | name | mapped_project_id | mapped_project_name | mapped_project_slug | hierarchy_level | segment_id
----------- | -------- | ------------------- | ----------------------- | ---------------------- | --------------- | 
kubectl_id  | kubectl  | kubectl_id | kubectl | kubectl | 1 | seg_123
kubectl_id  | kubectl  | kubernetes_id | kubernetes | kubernetes | 2 | seg_123
kubectl_id  | kubectl  | tlf_id | tlf | tlf | 3 | seg_123

Changes:
1. No hardcoded depth limit
The recursive CTE hard-codes depth_1 through depth_5. If the hierarchy ever grows to 6+ levels, we need a schema change. The new query handles any depth automatically via additional rows.

2. Reuses existing model infrastructure
PROJECT_SPINE already computes and materializes the hierarchy. The recursive CTE duplicates that logic inline — any bug fix or change to hierarchy traversal would need to be made in two places.

3. Simpler, more readable SQL
The recursive CTE is ~20 lines of stateful logic that requires understanding how the recursion builds depth_N columns. The new query is a straightforward set of joins that's immediately understandable.

5. Normalized shape
Wide columns (depth_1..5) are harder to work with downstream, filtering, aggregating, or displaying "what level is this?" requires knowing which column to look at. One row per level is easier to GROUP BY, FILTER, or JOIN against.

6. Removed repository_url on the leaf project
Since we're no longer using repository_url to automatically onboard, as that's a free text url in the UI and we shouldn't rely on it, then we don't need to return it as well.

return `
WITH RECURSIVE project_hierarchy AS (
SELECT project_id, name, description, project_logo, project_status,
project_maturity_level, repository_url, slug, parent_id,
1 AS depth,
name AS depth_1, NULL::VARCHAR AS depth_2, NULL::VARCHAR AS depth_3,
NULL::VARCHAR AS depth_4, NULL::VARCHAR AS depth_5
FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NULL
UNION ALL
SELECT p.project_id, p.name, p.description, p.project_logo, p.project_status,
p.project_maturity_level, p.repository_url, p.slug, p.parent_id,
h.depth + 1,
h.depth_1,
CASE WHEN h.depth + 1 = 2 THEN p.name ELSE h.depth_2 END,
CASE WHEN h.depth + 1 = 3 THEN p.name ELSE h.depth_3 END,
CASE WHEN h.depth + 1 = 4 THEN p.name ELSE h.depth_4 END,
CASE WHEN h.depth + 1 = 5 THEN p.name ELSE h.depth_5 END
FROM ANALYTICS.SILVER_DIM.PROJECTS p
INNER JOIN project_hierarchy h ON p.parent_id = h.project_id
)
SELECT ph.project_id, ph.name, ph.slug, ph.description, ph.project_logo, ph.repository_url,
ph.project_status, ph.project_maturity_level, ph.depth,
ph.depth_1, ph.depth_2, ph.depth_3, ph.depth_4, ph.depth_5,
s.segment_id
FROM project_hierarchy ph
LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s
ON s.source_id = ph.project_id AND s.project_type = 'subproject'
WHERE ph.project_id NOT IN (
SELECT DISTINCT parent_id FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NOT NULL
)
`
}

function buildS3FilenamePrefix(): string {
const now = new Date()
const year = now.getFullYear()
const month = String(now.getMonth() + 1).padStart(2, '0')
const day = String(now.getDate()).padStart(2, '0')
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH
if (!s3BucketPath) {
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
}
return `${s3BucketPath}/${PLATFORM}/${SOURCE_NAME}/${year}/${month}/${day}`
}

export async function executeExport(): Promise<void> {
log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'Starting PCC export')

const exporter = new SnowflakeExporter()
const db = await getDbConnection(WRITE_DB_CONFIG())

try {
const metadataStore = new MetadataStore(db)
const sourceQuery = buildSourceQuery()
const s3FilenamePrefix = buildS3FilenamePrefix()
const exportStartedAt = new Date()

const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => {
await metadataStore.insertExportJob(
PLATFORM,
SOURCE_NAME,
s3Path,
totalRows,
totalBytes,
exportStartedAt,
)
}

await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete)

log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'PCC export completed')
} catch (err) {
log.error({ platform: PLATFORM, sourceName: SOURCE_NAME, err }, 'PCC export failed')
throw err
} finally {
await exporter
.destroy()
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
}
}
Comment on lines +54 to +100
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems duplicated across snowflake_connectors/PCC_sync, can we move these to @crowd/snowflake as well?

2 changes: 2 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { executeExport } from './exportActivity'
export { executeCleanup } from './cleanupActivity'
Loading
Loading