Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS pcc_sync_errors_dedup_idx;

DROP TABLE IF EXISTS pcc_projects_sync_errors;

ALTER TABLE segments DROP COLUMN IF EXISTS maturity;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Add maturity field to segments for PCC project_maturity_level sync
ALTER TABLE segments ADD COLUMN IF NOT EXISTS maturity TEXT NULL;

-- Catch-all table for PCC sync issues that require manual review
CREATE TABLE IF NOT EXISTS pcc_projects_sync_errors (
id BIGSERIAL PRIMARY KEY,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
external_project_id TEXT,
external_project_slug TEXT,
error_type TEXT NOT NULL,
details JSONB,
resolved BOOLEAN NOT NULL DEFAULT FALSE
);

-- Deduplication index: one unresolved error per (project, error_type).
-- On repeated daily exports the same error upserts in place instead of accumulating rows.
-- Excludes rows where external_project_id IS NULL (e.g. SCHEMA_MISMATCH with no project id).
CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_idx
ON pcc_projects_sync_errors (external_project_id, error_type)
WHERE NOT resolved AND external_project_id IS NOT NULL;
25 changes: 25 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM node:20-bullseye-slim AS builder

RUN apt-get update && apt-get install -y python3 make g++ && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate

COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
RUN pnpm fetch

COPY ./services ./services
RUN pnpm i --frozen-lockfile

FROM node:20-bullseye-slim AS runner

RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

WORKDIR /usr/crowd/app
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate

COPY --from=builder /usr/crowd/app/node_modules ./node_modules
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
COPY --from=builder /usr/crowd/app/services/apps/pcc_sync_worker/ ./services/apps/pcc_sync_worker
18 changes: 18 additions & 0 deletions scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
**/.git
**/node_modules
**/venv*
**/.webpack
**/.serverless
**/.env
**/.env.*
**/.idea
**/.vscode
**/dist
.vscode/
.github/
frontend/
scripts/
.flake8
*.md
Makefile
backend/
53 changes: 53 additions & 0 deletions scripts/services/pcc-sync-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: pcc-sync-worker
CROWD_TEMPORAL_TASKQUEUE: pccSync
SHELL: /bin/sh

services:
pcc-sync-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run start'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

pcc-sync-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker
command: 'pnpm run dev'
working_dir: /usr/crowd/app/services/apps/pcc_sync_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: pcc-sync-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/apps/pcc_sync_worker/src:/usr/crowd/app/services/apps/pcc_sync_worker/src

networks:
crowd-bridge:
external: true
33 changes: 33 additions & 0 deletions services/apps/pcc_sync_worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "@crowd/pcc-sync-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker tsx src/index.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
"tsc-check": "tsc --noEmit",
"trigger-export": "SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts",
"trigger-cleanup": "SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts"
},
"dependencies": {
"@crowd/archetype-standard": "workspace:*",
"@crowd/archetype-worker": "workspace:*",
"@crowd/common": "workspace:*",
"@crowd/database": "workspace:*",
"@crowd/logging": "workspace:*",
"@crowd/slack": "workspace:*",
"@crowd/snowflake": "workspace:*",
"@crowd/temporal": "workspace:*",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
"devDependencies": {
"nodemon": "^3.0.1"
}
}
33 changes: 33 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack'
import { MetadataStore, S3Service } from '@crowd/snowflake'

const log = getServiceChildLogger('cleanupActivity')

const PLATFORM = 'pcc'

export async function executeCleanup(intervalHours = 24): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eric suggested this https://docs.google.com/document/d/1t6HyZdHGM9TA47fyJ5jRQ96z4O5esmnCDu03yNS4lwI/edit?disco=AAAB2hXY2p8, to basically explore the S3 bucket policies for automatically purging old snapshots / query-results. Perhaps leveraging that we could avoid having our own internal job to clean up S3 buckets.
Let me know if it's something worth checking or if it would still be preferable to have this cleanup job.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest doing this after this is merged and deployed as it's the same for the snowflake connector right now. We can change this for both of these when we need it but I would say the cleanup with the automatic purge on the AWS side is also a bit problematic if we don't process the data in time if the service is down the data gets deleted probably on a time since created basis. But then again if this happens we can just import the data again and process that batch. Still I would do this separate of this PR and for both services at once pcc-sync-worker and snowflake-connector.

const db = await getDbConnection(WRITE_DB_CONFIG())
const metadataStore = new MetadataStore(db)
const s3Service = new S3Service()

const jobs = await metadataStore.getCleanableJobS3Paths(intervalHours, PLATFORM, false)
log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable PCC jobs')

for (const job of jobs) {
try {
await s3Service.deleteFile(job.s3Path)
await metadataStore.markCleaned(job.id)
log.info({ jobId: job.id, s3Path: job.s3Path }, 'Cleaned PCC job')
} catch (err) {
log.error({ jobId: job.id, s3Path: job.s3Path, err }, 'Failed to clean PCC job, skipping')
sendSlackNotification(
SlackChannel.CDP_INTEGRATIONS_ALERTS,
SlackPersona.ERROR_REPORTER,
'PCC S3 Cleanup Failed',
`Failed to clean job \`${job.id}\` at \`${job.s3Path}\`.\n\n*Error:* ${err instanceof Error ? err.message : err}`,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unawaited sendSlackNotification risks unhandled promise rejection

Low Severity

sendSlackNotification is called without await, creating a floating promise. If the notification fails, the resulting unhandled promise rejection can crash the Node.js process (default behavior since Node 15+). No unhandledRejection handler was found in the services directory. Other parts of the codebase use await sendSlackNotificationAsync for this purpose.

Additional Locations (2)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8b6800a. Configure here.

}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
100 changes: 100 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/exportActivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Export activity: Execute PCC recursive CTE COPY INTO + write metadata.
*
* Full daily export of ANALYTICS.SILVER_DIM.PROJECTS via recursive CTE.
* No incremental logic — at ~1,538 leaf rows, a full daily export is simpler
* and more reliable than incremental (a parent name change would require
* re-exporting all descendants).
*/
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { MetadataStore, SnowflakeExporter } from '@crowd/snowflake'

const log = getServiceChildLogger('exportActivity')

const PLATFORM = 'pcc'
const SOURCE_NAME = 'project-hierarchy'

function buildSourceQuery(): string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I know this is the query I used before on the first data analysis, but this is not following the latest mapping rules of the new PROJECTS_SPINE table which already has the hierarchy defined -> We don't need to do it ourselves.

Proposed new query:

SELECT
    p.project_id,
    p.name,
    p.description,
    p.project_logo,
    p.project_status,
    p.project_maturity_level,
    ps.mapped_project_id,
    ps.mapped_project_name,
    ps.mapped_project_slug,
    ps.hierarchy_level,
    s.segment_id
FROM ANALYTICS.SILVER_DIM.PROJECTS p
LEFT JOIN ANALYTICS.SILVER_DIM.PROJECT_SPINE ps ON ps.base_project_id = p.project_id
LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s
    ON s.source_id = p.project_id
    AND s.project_type = 'subproject'
WHERE p.project_id NOT IN (
    SELECT DISTINCT parent_id
    FROM ANALYTICS.SILVER_DIM.PROJECTS
    WHERE parent_id IS NOT NULL
)
ORDER BY p.name, ps.hierarchy_level ASC;

Important
Level/depth is now longer in a column but rather in new rows. E.g.


project_id | name | mapped_project_id | mapped_project_name | mapped_project_slug | hierarchy_level | segment_id
----------- | -------- | ------------------- | ----------------------- | ---------------------- | --------------- | 
kubectl_id  | kubectl  | kubectl_id | kubectl | kubectl | 1 | seg_123
kubectl_id  | kubectl  | kubernetes_id | kubernetes | kubernetes | 2 | seg_123
kubectl_id  | kubectl  | tlf_id | tlf | tlf | 3 | seg_123

Changes:
1. No hardcoded depth limit
The recursive CTE hard-codes depth_1 through depth_5. If the hierarchy ever grows to 6+ levels, we need a schema change. The new query handles any depth automatically via additional rows.

2. Reuses existing model infrastructure
PROJECT_SPINE already computes and materializes the hierarchy. The recursive CTE duplicates that logic inline — any bug fix or change to hierarchy traversal would need to be made in two places.

3. Simpler, more readable SQL
The recursive CTE is ~20 lines of stateful logic that requires understanding how the recursion builds depth_N columns. The new query is a straightforward set of joins that's immediately understandable.

5. Normalized shape
Wide columns (depth_1..5) are harder to work with downstream, filtering, aggregating, or displaying "what level is this?" requires knowing which column to look at. One row per level is easier to GROUP BY, FILTER, or JOIN against.

6. Removed repository_url on the leaf project
Since we're no longer using repository_url to automatically onboard, as that's a free text url in the UI and we shouldn't rely on it, then we don't need to return it as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done just testing it now before resolving the comment.

return `
WITH RECURSIVE project_hierarchy AS (
SELECT project_id, name, description, project_logo, project_status,
project_maturity_level, repository_url, slug, parent_id,
1 AS depth,
name AS depth_1, NULL::VARCHAR AS depth_2, NULL::VARCHAR AS depth_3,
NULL::VARCHAR AS depth_4, NULL::VARCHAR AS depth_5
FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NULL
UNION ALL
SELECT p.project_id, p.name, p.description, p.project_logo, p.project_status,
p.project_maturity_level, p.repository_url, p.slug, p.parent_id,
h.depth + 1,
h.depth_1,
CASE WHEN h.depth + 1 = 2 THEN p.name ELSE h.depth_2 END,
CASE WHEN h.depth + 1 = 3 THEN p.name ELSE h.depth_3 END,
CASE WHEN h.depth + 1 = 4 THEN p.name ELSE h.depth_4 END,
CASE WHEN h.depth + 1 = 5 THEN p.name ELSE h.depth_5 END
FROM ANALYTICS.SILVER_DIM.PROJECTS p
INNER JOIN project_hierarchy h ON p.parent_id = h.project_id
)
SELECT ph.project_id, ph.name, ph.slug, ph.description, ph.project_logo, ph.repository_url,
ph.project_status, ph.project_maturity_level, ph.depth,
ph.depth_1, ph.depth_2, ph.depth_3, ph.depth_4, ph.depth_5,
s.segment_id
FROM project_hierarchy ph
LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s
ON s.source_id = ph.project_id AND s.project_type = 'subproject'
WHERE ph.project_id NOT IN (
SELECT DISTINCT parent_id FROM ANALYTICS.SILVER_DIM.PROJECTS
WHERE parent_id IS NOT NULL
)
`
}

function buildS3FilenamePrefix(): string {
const now = new Date()
const year = now.getFullYear()
const month = String(now.getMonth() + 1).padStart(2, '0')
const day = String(now.getDate()).padStart(2, '0')
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH
if (!s3BucketPath) {
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
}
return `${s3BucketPath}/${PLATFORM}/${SOURCE_NAME}/${year}/${month}/${day}`
}

export async function executeExport(): Promise<void> {
log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'Starting PCC export')

const exporter = new SnowflakeExporter()
const db = await getDbConnection(WRITE_DB_CONFIG())

try {
const metadataStore = new MetadataStore(db)
const sourceQuery = buildSourceQuery()
const s3FilenamePrefix = buildS3FilenamePrefix()
const exportStartedAt = new Date()

const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => {
await metadataStore.insertExportJob(
PLATFORM,
SOURCE_NAME,
s3Path,
totalRows,
totalBytes,
exportStartedAt,
)
}

await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete)

log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'PCC export completed')
} catch (err) {
log.error({ platform: PLATFORM, sourceName: SOURCE_NAME, err }, 'PCC export failed')
throw err
} finally {
await exporter
.destroy()
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
}
}
Comment thread
themarolt marked this conversation as resolved.
Outdated
2 changes: 2 additions & 0 deletions services/apps/pcc_sync_worker/src/activities/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { executeExport } from './exportActivity'
export { executeCleanup } from './cleanupActivity'
6 changes: 6 additions & 0 deletions services/apps/pcc_sync_worker/src/config/settings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/**
* Centralized configuration: Temporal.
*/

export { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal'
export type { ITemporalConfig } from '@crowd/temporal'
Loading
Loading