diff --git a/backend/src/database/migrations/U1775312770__pcc-sync-worker-setup.sql b/backend/src/database/migrations/U1775312770__pcc-sync-worker-setup.sql new file mode 100644 index 0000000000..1b84cbc1c9 --- /dev/null +++ b/backend/src/database/migrations/U1775312770__pcc-sync-worker-setup.sql @@ -0,0 +1,5 @@ +DROP INDEX IF EXISTS pcc_sync_errors_dedup_idx; + +DROP TABLE IF EXISTS pcc_projects_sync_errors; + +ALTER TABLE segments DROP COLUMN IF EXISTS maturity; diff --git a/backend/src/database/migrations/V1775312770__pcc-sync-worker-setup.sql b/backend/src/database/migrations/V1775312770__pcc-sync-worker-setup.sql new file mode 100644 index 0000000000..16cecdac82 --- /dev/null +++ b/backend/src/database/migrations/V1775312770__pcc-sync-worker-setup.sql @@ -0,0 +1,27 @@ +-- Add maturity field to segments for PCC project_maturity_level sync +ALTER TABLE segments ADD COLUMN IF NOT EXISTS maturity TEXT NULL; + +-- Catch-all table for PCC sync issues that require manual review +CREATE TABLE IF NOT EXISTS pcc_projects_sync_errors ( + id BIGSERIAL PRIMARY KEY, + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + external_project_id TEXT, + external_project_slug TEXT, + error_type TEXT NOT NULL, + details JSONB, + resolved BOOLEAN NOT NULL DEFAULT FALSE +); + +-- Deduplication index: one unresolved error per (project, error_type). +-- On repeated daily exports the same error upserts in place instead of accumulating rows. +-- Excludes rows where external_project_id IS NULL (e.g. SCHEMA_MISMATCH with no project id). +CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_idx + ON pcc_projects_sync_errors (external_project_id, error_type) + WHERE NOT resolved AND external_project_id IS NOT NULL; + +-- Deduplication index for unidentifiable rows (no external_project_id). +-- Keyed on (error_type, reason) so repeated daily exports don't accumulate duplicate rows +-- for the same class of malformed input (e.g. rows missing PROJECT_ID/NAME/DEPTH). +CREATE UNIQUE INDEX IF NOT EXISTS pcc_sync_errors_dedup_unknown_idx + ON pcc_projects_sync_errors (error_type, (details->>'reason')) + WHERE NOT resolved AND external_project_id IS NULL; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c0108821f6..4d6df75e59 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1310,6 +1310,52 @@ importers: specifier: ^3.0.1 version: 3.1.0 + services/apps/pcc_sync_worker: + dependencies: + '@crowd/archetype-standard': + specifier: workspace:* + version: link:../../archetypes/standard + '@crowd/archetype-worker': + specifier: workspace:* + version: link:../../archetypes/worker + '@crowd/common': + specifier: workspace:* + version: link:../../libs/common + '@crowd/database': + specifier: workspace:* + version: link:../../libs/database + '@crowd/logging': + specifier: workspace:* + version: link:../../libs/logging + '@crowd/slack': + specifier: workspace:* + version: link:../../libs/slack + '@crowd/snowflake': + specifier: workspace:* + version: link:../../libs/snowflake + '@crowd/temporal': + specifier: workspace:* + version: link:../../libs/temporal + '@crowd/types': + specifier: workspace:* + version: link:../../libs/types + '@temporalio/client': + specifier: ~1.11.8 + version: 1.11.8 + '@temporalio/workflow': + specifier: ~1.11.8 + version: 1.11.8 + tsx: + specifier: ^4.7.1 + version: 4.7.3 + typescript: + specifier: ^5.6.3 + version: 5.6.3 + devDependencies: + nodemon: + specifier: ^3.0.1 + version: 3.1.0 + services/apps/profiles_worker: dependencies: '@crowd/archetype-standard': @@ -1572,9 +1618,6 @@ importers: services/apps/snowflake_connectors: dependencies: - '@aws-sdk/client-s3': - specifier: ^3.700.0 - version: 3.985.0 '@crowd/archetype-standard': specifier: workspace:* version: link:../../archetypes/standard @@ -1614,9 +1657,6 @@ importers: '@crowd/types': specifier: workspace:* version: link:../../libs/types - '@dsnp/parquetjs': - specifier: ^1.7.0 - version: 1.8.7(bufferutil@4.0.8)(utf-8-validate@5.0.10) '@temporalio/client': specifier: ~1.11.8 version: 1.11.8 @@ -2363,9 +2403,18 @@ importers: services/libs/snowflake: dependencies: + '@aws-sdk/client-s3': + specifier: ^3.700.0 + version: 3.985.0 + '@crowd/database': + specifier: workspace:* + version: link:../database '@crowd/logging': specifier: workspace:* version: link:../logging + '@dsnp/parquetjs': + specifier: ^1.7.0 + version: 1.8.7(bufferutil@4.0.8)(utf-8-validate@5.0.10) snowflake-sdk: specifier: ^2.3.3 version: 2.3.4(asn1.js@5.4.1)(encoding@0.1.13) @@ -7399,6 +7448,7 @@ packages: git-raw-commits@4.0.0: resolution: {integrity: sha512-ICsMM1Wk8xSGMowkOmPrzo2Fgmfo4bMHLNX6ytHjajRJUqvHOw/TFapQ+QG75c3X/tTDDhOSRPGC52dDbNM8FQ==} engines: {node: '>=16'} + deprecated: This package is no longer maintained. For the JavaScript API, please use @conventional-changelog/git-client instead. hasBin: true glob-parent@5.1.2: @@ -7420,11 +7470,11 @@ packages: glob@6.0.4: resolution: {integrity: sha512-MKZeRNyYZAVVVG1oZeLaWie1uweH40m9AZwIwxyPbTSX4hHrVYSzLg0Ro5Z5R7XKkIX+Cc6oD1rqeDJnwsB8/A==} - deprecated: Glob versions prior to v9 are no longer supported + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me glob@7.2.3: resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==} - deprecated: Glob versions prior to v9 are no longer supported + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me global-directory@4.0.1: resolution: {integrity: sha512-wHTUcDUoZ1H5/0iVqEudYW4/kAlN5cZ3j/bXn0Dpbizl9iaUVeWSHqiOjsgk6OW2bkLclbBjzewBz6weQ1zA2Q==} @@ -10929,8 +10979,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0 - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11124,11 +11174,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0': + '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11167,6 +11217,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso@3.556.0': @@ -11342,11 +11393,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': + '@aws-sdk/client-sts@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11385,7 +11436,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.985.0': @@ -11551,7 +11601,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -11728,7 +11778,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) + '@aws-sdk/client-sts': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -12040,7 +12090,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 diff --git a/scripts/services/docker/Dockerfile.pcc_sync_worker b/scripts/services/docker/Dockerfile.pcc_sync_worker new file mode 100644 index 0000000000..ae54539255 --- /dev/null +++ b/scripts/services/docker/Dockerfile.pcc_sync_worker @@ -0,0 +1,25 @@ +FROM node:20-bullseye-slim AS builder + +RUN apt-get update && apt-get install -y python3 make g++ && rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/crowd/app +RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate + +COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./ +RUN pnpm fetch + +COPY ./services ./services +RUN pnpm i --frozen-lockfile + +FROM node:20-bullseye-slim AS runner + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/crowd/app +RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate + +COPY --from=builder /usr/crowd/app/node_modules ./node_modules +COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json +COPY --from=builder /usr/crowd/app/services/libs ./services/libs +COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes +COPY --from=builder /usr/crowd/app/services/apps/pcc_sync_worker/ ./services/apps/pcc_sync_worker diff --git a/scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore b/scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore new file mode 100644 index 0000000000..4b74fc87af --- /dev/null +++ b/scripts/services/docker/Dockerfile.pcc_sync_worker.dockerignore @@ -0,0 +1,18 @@ +**/.git +**/node_modules +**/venv* +**/.webpack +**/.serverless +**/.env +**/.env.* +**/.idea +**/.vscode +**/dist +.vscode/ +.github/ +frontend/ +scripts/ +.flake8 +*.md +Makefile +backend/ diff --git a/scripts/services/pcc-sync-worker.yaml b/scripts/services/pcc-sync-worker.yaml new file mode 100644 index 0000000000..8e30e081eb --- /dev/null +++ b/scripts/services/pcc-sync-worker.yaml @@ -0,0 +1,53 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: pcc-sync-worker + CROWD_TEMPORAL_TASKQUEUE: pccSync + SHELL: /bin/sh + +services: + pcc-sync-worker: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker + command: 'pnpm run start' + working_dir: /usr/crowd/app/services/apps/pcc_sync_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + pcc-sync-worker-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.pcc_sync_worker + command: 'pnpm run dev' + working_dir: /usr/crowd/app/services/apps/pcc_sync_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: pcc-sync-worker + networks: + - crowd-bridge + volumes: + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src + - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src + - ../../services/apps/pcc_sync_worker/src:/usr/crowd/app/services/apps/pcc_sync_worker/src + +networks: + crowd-bridge: + external: true diff --git a/services/apps/pcc_sync_worker/package.json b/services/apps/pcc_sync_worker/package.json new file mode 100644 index 0000000000..6a421618ae --- /dev/null +++ b/services/apps/pcc_sync_worker/package.json @@ -0,0 +1,36 @@ +{ + "name": "@crowd/pcc-sync-worker", + "scripts": { + "start": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker tsx src/index.ts", + "start:debug": "CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts", + "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pccSync SERVICE=pcc-sync-worker LOG_LEVEL=debug tsx src/index.ts", + "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", + "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", + "lint": "npx eslint --ext .ts src --max-warnings=0", + "format": "npx prettier --write \"src/**/*.ts\"", + "format-check": "npx prettier --check .", + "tsc-check": "tsc --noEmit", + "trigger-export": "SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts", + "trigger-export:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerExport.ts", + "trigger-cleanup": "SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts", + "trigger-cleanup:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=pcc-sync-worker tsx src/scripts/triggerCleanup.ts" + }, + "dependencies": { + "@crowd/archetype-standard": "workspace:*", + "@crowd/archetype-worker": "workspace:*", + "@crowd/common": "workspace:*", + "@crowd/database": "workspace:*", + "@crowd/types": "workspace:*", + "@crowd/logging": "workspace:*", + "@crowd/slack": "workspace:*", + "@crowd/snowflake": "workspace:*", + "@crowd/temporal": "workspace:*", + "@temporalio/client": "~1.11.8", + "@temporalio/workflow": "~1.11.8", + "tsx": "^4.7.1", + "typescript": "^5.6.3" + }, + "devDependencies": { + "nodemon": "^3.0.1" + } +} diff --git a/services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts b/services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts new file mode 100644 index 0000000000..f26ae59ff7 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/activities/cleanupActivity.ts @@ -0,0 +1,37 @@ +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' +import { getServiceChildLogger } from '@crowd/logging' +import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack' +import { MetadataStore, S3Service, buildPlatformFilter } from '@crowd/snowflake' + +const log = getServiceChildLogger('cleanupActivity') + +const PLATFORM = 'pcc' + +export async function executeCleanup(intervalHours = 24): Promise { + const db = await getDbConnection(WRITE_DB_CONFIG()) + const metadataStore = new MetadataStore(db) + const s3Service = new S3Service() + + const jobs = await metadataStore.getCleanableJobS3Paths( + intervalHours, + buildPlatformFilter([PLATFORM]), + false, + ) + log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable PCC jobs') + + for (const job of jobs) { + try { + await s3Service.deleteFile(job.s3Path) + await metadataStore.markCleaned(job.id) + log.info({ jobId: job.id, s3Path: job.s3Path }, 'Cleaned PCC job') + } catch (err) { + log.error({ jobId: job.id, s3Path: job.s3Path, err }, 'Failed to clean PCC job, skipping') + sendSlackNotification( + SlackChannel.CDP_INTEGRATIONS_ALERTS, + SlackPersona.ERROR_REPORTER, + 'PCC S3 Cleanup Failed', + `Failed to clean job \`${job.id}\` at \`${job.s3Path}\`.\n\n*Error:* ${err instanceof Error ? err.message : err}`, + ) + } + } +} diff --git a/services/apps/pcc_sync_worker/src/activities/exportActivity.ts b/services/apps/pcc_sync_worker/src/activities/exportActivity.ts new file mode 100644 index 0000000000..7d2725c30a --- /dev/null +++ b/services/apps/pcc_sync_worker/src/activities/exportActivity.ts @@ -0,0 +1,81 @@ +/** + * Export activity: Execute PCC COPY INTO + write metadata. + * + * Full daily export of leaf projects from ANALYTICS.SILVER_DIM.PROJECTS joined + * with PROJECT_SPINE to produce one row per (leaf, hierarchy_level) pair. + * No incremental logic — at ~1,538 leaf rows, a full daily export is simpler + * and more reliable than incremental (a parent name change would require + * re-exporting all descendants). + */ +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' +import { getServiceChildLogger } from '@crowd/logging' +import { MetadataStore, SnowflakeExporter, buildS3FilenamePrefix } from '@crowd/snowflake' + +const log = getServiceChildLogger('exportActivity') + +const PLATFORM = 'pcc' +const SOURCE_NAME = 'project-hierarchy' + +function buildSourceQuery(): string { + return ` + SELECT + p.project_id, + p.name, + p.description, + p.project_logo, + p.project_status, + p.project_maturity_level, + ps.mapped_project_id, + ps.mapped_project_name, + ps.mapped_project_slug, + ps.hierarchy_level, + s.segment_id + FROM ANALYTICS.SILVER_DIM.PROJECTS p + LEFT JOIN ANALYTICS.SILVER_DIM.PROJECT_SPINE ps ON ps.base_project_id = p.project_id + LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s + ON s.source_id = p.project_id + AND s.project_type = 'subproject' + WHERE p.project_id NOT IN ( + SELECT DISTINCT parent_id + FROM ANALYTICS.SILVER_DIM.PROJECTS + WHERE parent_id IS NOT NULL + ) + ORDER BY p.name, ps.hierarchy_level ASC + ` +} + +export async function executeExport(): Promise { + log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'Starting PCC export') + + const exporter = new SnowflakeExporter() + const db = await getDbConnection(WRITE_DB_CONFIG()) + + try { + const metadataStore = new MetadataStore(db) + const sourceQuery = buildSourceQuery() + const s3FilenamePrefix = buildS3FilenamePrefix(PLATFORM, SOURCE_NAME) + const exportStartedAt = new Date() + + const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => { + await metadataStore.insertExportJob( + PLATFORM, + SOURCE_NAME, + s3Path, + totalRows, + totalBytes, + exportStartedAt, + ) + } + + await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete) + + log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'PCC export completed') + } catch (err) { + log.error({ platform: PLATFORM, sourceName: SOURCE_NAME, err }, 'PCC export failed') + throw err + } finally { + await exporter + .destroy() + .catch((err) => log.warn({ err }, 'Failed to close Snowflake connection')) + } +} diff --git a/services/apps/pcc_sync_worker/src/activities/index.ts b/services/apps/pcc_sync_worker/src/activities/index.ts new file mode 100644 index 0000000000..1fd1f65a10 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/activities/index.ts @@ -0,0 +1,2 @@ +export { executeExport } from './exportActivity' +export { executeCleanup } from './cleanupActivity' diff --git a/services/apps/pcc_sync_worker/src/config/settings.ts b/services/apps/pcc_sync_worker/src/config/settings.ts new file mode 100644 index 0000000000..12d00867d4 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/config/settings.ts @@ -0,0 +1,6 @@ +/** + * Centralized configuration: Temporal. + */ + +export { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal' +export type { ITemporalConfig } from '@crowd/temporal' diff --git a/services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts b/services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts new file mode 100644 index 0000000000..3e111a0862 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/consumer/pccProjectConsumer.ts @@ -0,0 +1,510 @@ +/** + * PCC project consumer: polls snowflakeExportJobs for platform='pcc' jobs, + * streams each Parquet file, runs the matching cascade, and writes to DB. + * + * One DB transaction per job — all segment + insightsProject writes roll back + * together on any failure. Errors that can't be auto-resolved are written to + * pcc_projects_sync_errors for manual review. + */ +import { DEFAULT_TENANT_ID } from '@crowd/common' +import { DbConnOrTx, DbConnection, WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' +import { getServiceChildLogger } from '@crowd/logging' +import { MetadataStore, S3Service, SnowflakeExportJob, buildPlatformFilter } from '@crowd/snowflake' + +import { parsePccRow } from '../parser' +import type { CdpHierarchyTarget, ParsedPccProject } from '../parser' + +const log = getServiceChildLogger('pccProjectConsumer') + +const PLATFORM = 'pcc' +const MAX_POLLING_INTERVAL_MS = 30 * 60 * 1000 // 30 minutes + +// ───────────────────────────────────────────────────────────────────────────── +// Consumer loop +// ───────────────────────────────────────────────────────────────────────────── + +export class PccProjectConsumer { + private running = false + private currentPollingIntervalMs: number + + constructor( + private readonly metadataStore: MetadataStore, + private readonly s3Service: S3Service, + private readonly db: DbConnection, + private readonly pollingIntervalMs: number, + readonly dryRun: boolean = false, + ) { + this.currentPollingIntervalMs = pollingIntervalMs + } + + async start(): Promise { + this.running = true + log.info({ dryRun: this.dryRun }, 'PCC project consumer started') + + while (this.running) { + try { + const job = await this.metadataStore.claimOldestPendingJob(buildPlatformFilter([PLATFORM])) + + if (job) { + this.currentPollingIntervalMs = this.pollingIntervalMs + await this.processJob(job) + await new Promise((resolve) => setImmediate(resolve)) + continue + } + } catch (err) { + log.error({ err }, 'Error in consumer loop') + await this.sleep(this.pollingIntervalMs) + continue + } + + log.info({ currentPollingIntervalMs: this.currentPollingIntervalMs }, 'No pending PCC jobs') + await this.sleep(this.currentPollingIntervalMs) + this.currentPollingIntervalMs = Math.min( + this.currentPollingIntervalMs * 2, + MAX_POLLING_INTERVAL_MS, + ) + } + + log.info('PCC project consumer stopped') + } + + stop(): void { + this.running = false + } + + // ───────────────────────────────────────────────────────────────────────── + // Job processing + // ───────────────────────────────────────────────────────────────────────── + + private async processJob(job: SnowflakeExportJob): Promise { + log.info({ jobId: job.id, s3Path: job.s3Path, dryRun: this.dryRun }, 'Processing PCC job') + + const startTime = Date.now() + let totalCount = 0 + let upsertedCount = 0 + let skippedCount = 0 + let mismatchCount = 0 + let schemaMismatchCount = 0 + let schemaMismatchMatchedCount = 0 // SCHEMA_MISMATCH rows that still have a CDP segment match + + try { + // Stream all rows and group by PROJECT_ID before processing. + // The export emits one row per (leaf, hierarchy_level) from the PROJECT_SPINE + // JOIN, so each leaf project produces N rows (one per ancestor level). + const groups = new Map[]>() + for await (const raw of this.s3Service.streamParquetRows(job.s3Path)) { + const projectId = String((raw as Record).PROJECT_ID ?? '') + if (!projectId) continue + if (!groups.has(projectId)) groups.set(projectId, []) + const group = groups.get(projectId) + if (group) group.push(raw) + } + + await this.db.tx(async (tx) => { + for (const [, rows] of groups) { + const parsed = parsePccRow(rows) + + totalCount++ + + if (parsed.ok === false) { + schemaMismatchCount++ + const errorDetails: Record = { ...parsed.details } + + // If the row had identifiable fields (depth-range errors), attempt a segment + // match so the error record reflects whether a CDP segment exists for this + // project — useful for triage even when the depth rule is unsupported. + if (parsed.pccProjectId) { + const matched = await findSegmentBySourceId(tx, parsed.pccProjectId) + if (matched) { + schemaMismatchMatchedCount++ + errorDetails.matchedSegmentId = matched.id + errorDetails.matchedSegmentName = matched.name + errorDetails.matchedVia = 'sourceId' + } + } + + log.warn( + { + pccProjectId: parsed.pccProjectId ?? null, + pccSlug: parsed.pccSlug ?? null, + ...errorDetails, + }, + 'Schema mismatch in PCC row', + ) + if (!this.dryRun) { + await insertSyncError( + tx, + parsed.pccProjectId ?? null, + parsed.pccSlug ?? null, + 'SCHEMA_MISMATCH', + errorDetails, + ) + } + continue + } + + const { project } = parsed + const result = await this.processRow(tx, project) + + switch (result.action) { + case 'UPSERTED': + upsertedCount++ + break + case 'SKIPPED': + skippedCount++ + break + case 'MISMATCH': + mismatchCount++ + if (!this.dryRun) { + await insertSyncError( + tx, + project.pccProjectId, + project.pccSlug, + 'HIERARCHY_MISMATCH', + result.details, + ) + } + break + } + } + }) + + const durationMs = Date.now() - startTime + log.info( + { + jobId: job.id, + dryRun: this.dryRun, + durationMs, + total: totalCount, + upserted: upsertedCount, + skipped: skippedCount, + hierarchyMismatch: mismatchCount, + schemaMismatch: schemaMismatchCount, + schemaMismatchWithCdpMatch: schemaMismatchMatchedCount, + schemaMismatchNoCdpMatch: schemaMismatchCount - schemaMismatchMatchedCount, + }, + 'PCC job completed', + ) + + if (!this.dryRun) { + await this.metadataStore.markCompleted(job.id, { + transformedCount: upsertedCount, + skippedCount: skippedCount + mismatchCount + schemaMismatchCount, + processingDurationMs: durationMs, + }) + } + } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err) + log.error({ jobId: job.id, err }, 'PCC job failed') + + if (!this.dryRun) { + try { + await this.metadataStore.markFailed(job.id, errorMessage, { + processingDurationMs: Date.now() - startTime, + }) + } catch (updateErr) { + log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed') + } + } + } + } + + // ───────────────────────────────────────────────────────────────────────── + // Per-row matching cascade + writes + // ───────────────────────────────────────────────────────────────────────── + + private async processRow( + tx: DbConnOrTx, + project: ParsedPccProject, + ): Promise< + | { action: 'UPSERTED' } + | { action: 'SKIPPED' } + | { action: 'MISMATCH'; details: Record } + > { + // Step 1: segment_id from Snowflake ACTIVE_SEGMENTS JOIN + let segment = project.segmentIdFromSnowflake + ? await findSegmentById(tx, project.segmentIdFromSnowflake) + : null + + // Step 2: sourceId fallback + if (!segment) { + segment = await findSegmentBySourceId(tx, project.pccProjectId) + } + + // Step 3: no match → SKIP (Phase 1: project doesn't exist in CDP yet) + if (!segment) { + return { action: 'SKIPPED' } + } + + // Hierarchy mismatch check: segment was matched but parent/group differs + const mismatchFields = detectHierarchyMismatch(segment, project.cdpTarget) + if (mismatchFields.length > 0) { + return { + action: 'MISMATCH', + details: { + segmentId: segment.id, + segmentName: segment.name, + pccProjectId: project.pccProjectId, + mismatchFields, + cdpTarget: project.cdpTarget, + currentHierarchy: { + group: segment.grandparentName ?? segment.parentName ?? segment.name, + project: segment.parentName ?? segment.name, + subproject: segment.name, + }, + }, + } + } + + // Slug drift detection: log when PCC slug differs from the CDP segment slug. + // We do NOT update the slug — it is a stable identifier referenced by FK from + // securityInsightsEvaluations and related tables. The mismatch is recorded for + // manual review but does not block the sync. + if (project.pccSlug && segment.slug && project.pccSlug !== segment.slug) { + log.warn( + { segmentId: segment.id, pccSlug: project.pccSlug, cdpSlug: segment.slug }, + 'Slug drift detected — PCC slug differs from CDP segment slug', + ) + if (!this.dryRun) { + await insertSyncError(tx, project.pccProjectId, project.pccSlug, 'SLUG_CHANGED', { + segmentId: segment.id, + pccSlug: project.pccSlug, + cdpSlug: segment.slug, + }) + } + } + + if (!this.dryRun) { + await upsertSegment(tx, project.pccProjectId, project) + const nameConflict = await upsertInsightsProject( + tx, + segment.id, + project.pccProjectId, + project, + ) + if (nameConflict) { + log.warn( + { segmentId: segment.id, name: project.name }, + 'insightsProject name conflict — segment synced, insights project skipped', + ) + await insertSyncError(tx, project.pccProjectId, project.pccSlug, 'INSIGHTS_NAME_CONFLICT', { + segmentId: segment.id, + name: project.name, + }) + } + } else { + log.info( + { + segmentId: segment.id, + pccProjectId: project.pccProjectId, + name: project.name, + status: project.status, + maturity: project.maturity, + }, + '[dry-run] Would upsert segment', + ) + } + + return { action: 'UPSERTED' } + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// DB helpers +// ───────────────────────────────────────────────────────────────────────────── + +interface SegmentRow { + id: string + name: string + slug: string | null + parentName: string | null + grandparentName: string | null +} + +async function findSegmentById(db: DbConnOrTx, segmentId: string): Promise { + return db.oneOrNone( + `SELECT id, name, slug, "parentName", "grandparentName" + FROM segments + WHERE id = $(segmentId) AND "tenantId" = $(tenantId)`, + { segmentId, tenantId: DEFAULT_TENANT_ID }, + ) +} + +async function findSegmentBySourceId(db: DbConnOrTx, sourceId: string): Promise { + return db.oneOrNone( + `SELECT id, name, slug, "parentName", "grandparentName" + FROM segments + WHERE "sourceId" = $(sourceId) AND type = 'subproject' AND "tenantId" = $(tenantId)`, + { sourceId, tenantId: DEFAULT_TENANT_ID }, + ) +} + +function detectHierarchyMismatch(segment: SegmentRow, cdpTarget: CdpHierarchyTarget): string[] { + // Only check structural hierarchy (parent/grandparent placement), not the leaf name. + // The leaf name is a metadata field we're here to sync — a difference there is an UPDATE, + // not a mismatch. Mismatches indicate the project is in the wrong place in the hierarchy, + // which requires manual review before auto-fixing (per Phase 1 spec). + const mismatches: string[] = [] + if (segment.grandparentName && segment.grandparentName !== cdpTarget.group) { + mismatches.push('group_name') + } + if (segment.parentName && segment.parentName !== cdpTarget.project) { + mismatches.push('project_name') + } + return mismatches +} + +async function upsertSegment( + db: DbConnOrTx, + sourceId: string, + project: ParsedPccProject, +): Promise { + // Update all CDP segments whose sourceId equals this PCC PROJECT_ID. + // Each PCC node has its own PROJECT_ID. In CDP, how many segment levels share this + // sourceId depends on the effective depth: + // eff=1 → group+project+subproject all share the same PROJECT_ID (same name for all) + // eff=2 → project+subproject share the leaf's PROJECT_ID; group has a different one + // eff=3 or 4 → only the subproject segment carries this PROJECT_ID + // So this UPDATE always writes the correct name and never touches unrelated levels. + await db.none( + `UPDATE segments + SET name = $(name), + status = COALESCE($(status)::"segmentsStatus_type", status), + maturity = $(maturity), + description = $(description), + "updatedAt" = NOW() + WHERE "sourceId" = $(sourceId) AND "tenantId" = $(tenantId)`, + { + sourceId, + name: project.name, + status: project.status, + maturity: project.maturity, + description: project.description, + tenantId: DEFAULT_TENANT_ID, + }, + ) +} + +// Returns true if a name conflict prevented creating the insightsProject row. +// Updates insightsProject rows for ALL segment levels sharing the same sourceId +// (group, project, subproject). The INSERT is restricted to the matched subproject +// segment (identified by segmentId) to avoid duplicating insights projects for +// hierarchy-only segments. +async function upsertInsightsProject( + db: DbConnOrTx, + segmentId: string, + sourceId: string, + project: ParsedPccProject, +): Promise { + // Check for a name conflict upfront — an active insightsProject belonging to a segment + // outside this PCC project's sourceId group already holds this name. + // We must exclude all segments sharing the same sourceId (not just the subproject), + // because on repeat syncs the group/project levels already carry the same name and + // would produce false positives if only the subproject segmentId were excluded. + const conflicting = await db.oneOrNone<{ id: string }>( + `SELECT ip.id + FROM "insightsProjects" ip + JOIN segments s ON s.id = ip."segmentId" + WHERE ip.name = $(name) + AND ip."deletedAt" IS NULL + AND s."sourceId" != $(sourceId) + AND s."tenantId" = $(tenantId)`, + { name: project.name, sourceId, tenantId: DEFAULT_TENANT_ID }, + ) + if (conflicting) return true + + // No conflict — update all active insightsProject rows linked to any segment that + // shares the PCC sourceId (group, project, subproject levels). + // Slug is intentionally not updated — it is a stable identifier referenced by FK from + // securityInsightsEvaluations and related tables. + await db.none( + `UPDATE "insightsProjects" ip + SET name = $(name), + description = $(description), + "logoUrl" = $(logoUrl), + "updatedAt" = NOW() + FROM segments s + WHERE ip."segmentId" = s.id + AND s."sourceId" = $(sourceId) + AND s."tenantId" = $(tenantId) + AND ip."deletedAt" IS NULL`, + { + sourceId, + tenantId: DEFAULT_TENANT_ID, + name: project.name, + description: project.description, + logoUrl: project.logoUrl, + }, + ) + + // INSERT for the subproject segment only (the matched leaf). + // Partial unique index on segmentId WHERE deletedAt IS NULL means ON CONFLICT won't fire + // for soft-deleted rows — use UPDATE-then-INSERT pattern (UPDATE already done above). + const exists = await db.oneOrNone<{ id: string }>( + `SELECT id FROM "insightsProjects" WHERE "segmentId" = $(segmentId) AND "deletedAt" IS NULL`, + { segmentId }, + ) + if (exists) return false + + const inserted = await db.result( + `INSERT INTO "insightsProjects" (name, slug, description, "segmentId", "logoUrl", "isLF") + VALUES ($(name), generate_slug('insightsProjects', $(name)), $(description), $(segmentId), $(logoUrl), TRUE) + ON CONFLICT (name) WHERE "deletedAt" IS NULL DO NOTHING`, + { name: project.name, description: project.description, segmentId, logoUrl: project.logoUrl }, + ) + if (inserted.rowCount === 0) return true + + return false +} + +async function insertSyncError( + db: DbConnOrTx, + externalProjectId: string | null, + externalProjectSlug: string | null, + errorType: string, + details: Record, +): Promise { + const serialized = JSON.stringify(details) + if (externalProjectId !== null) { + // Known project: deduplicate on (external_project_id, error_type). + await db.none( + `INSERT INTO pcc_projects_sync_errors + (external_project_id, external_project_slug, error_type, details) + VALUES ($(externalProjectId), $(externalProjectSlug), $(errorType), $(details)::jsonb) + ON CONFLICT (external_project_id, error_type) + WHERE NOT resolved AND external_project_id IS NOT NULL + DO UPDATE SET details = EXCLUDED.details, external_project_slug = EXCLUDED.external_project_slug, run_at = NOW()`, + { externalProjectId, externalProjectSlug, errorType, details: serialized }, + ) + } else { + // Unidentifiable row (no PROJECT_ID): deduplicate on (error_type, details->>'reason') + // so repeated daily exports don't accumulate duplicate rows for the same class of + // malformed input. Each distinct failure reason gets one unresolved row. + await db.none( + `INSERT INTO pcc_projects_sync_errors + (external_project_slug, error_type, details) + VALUES ($(externalProjectSlug), $(errorType), $(details)::jsonb) + ON CONFLICT (error_type, (details->>'reason')) + WHERE NOT resolved AND external_project_id IS NULL + DO UPDATE SET details = EXCLUDED.details, external_project_slug = EXCLUDED.external_project_slug, run_at = NOW()`, + { externalProjectSlug, errorType, details: serialized }, + ) + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Factory +// ───────────────────────────────────────────────────────────────────────────── + +export async function createPccProjectConsumer(dryRun = false): Promise { + const db = await getDbConnection(WRITE_DB_CONFIG()) + const metadataStore = new MetadataStore(db) + const s3Service = new S3Service() + const pollingIntervalMs = 10_000 // 10 seconds + + return new PccProjectConsumer(metadataStore, s3Service, db, pollingIntervalMs, dryRun) +} diff --git a/services/apps/pcc_sync_worker/src/index.ts b/services/apps/pcc_sync_worker/src/index.ts new file mode 100644 index 0000000000..31a0e09b35 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/index.ts @@ -0,0 +1,42 @@ +/** + * Entry point: Start Temporal worker + PCC project consumer loop. + */ +import { getServiceChildLogger } from '@crowd/logging' + +import { createPccProjectConsumer } from './consumer/pccProjectConsumer' +import { svc } from './main' +import { schedulePccS3Cleanup, schedulePccS3Export } from './schedules' + +const log = getServiceChildLogger('main') + +const DRY_RUN = process.env.PCC_DRY_RUN === 'true' + +setImmediate(async () => { + await svc.init() + + await schedulePccS3Export() + await schedulePccS3Cleanup() + + const consumer = await createPccProjectConsumer(DRY_RUN) + consumer.start().catch((err) => { + log.error({ err }, 'Consumer loop crashed') + process.exit(1) + }) + + const HARD_TIMEOUT_MS = 2 * 60 * 60 * 1000 + + const shutdown = () => { + log.info('Shutdown signal received, stopping consumer...') + consumer.stop() + + setTimeout(() => { + log.warn('Graceful shutdown timed out after 2 hours, forcing exit') + process.exit(1) + }, HARD_TIMEOUT_MS).unref() + } + + process.on('SIGINT', shutdown) + process.on('SIGTERM', shutdown) + + await svc.start() +}) diff --git a/services/apps/pcc_sync_worker/src/main.ts b/services/apps/pcc_sync_worker/src/main.ts new file mode 100644 index 0000000000..a08bb41cbc --- /dev/null +++ b/services/apps/pcc_sync_worker/src/main.ts @@ -0,0 +1,38 @@ +/** + * Temporal worker setup. + * + * Uses the ServiceWorker archetype which handles Temporal connection, + * workflow bundling, and activity registration automatically. + */ +import { Config } from '@crowd/archetype-standard' +import { Options, ServiceWorker } from '@crowd/archetype-worker' + +const config: Config = { + envvars: [ + 'CROWD_SNOWFLAKE_S3_BUCKET_PATH', + 'CROWD_SNOWFLAKE_STORAGE_INTEGRATION', + 'CROWD_SNOWFLAKE_S3_REGION', + 'CROWD_SNOWFLAKE_S3_ACCESS_KEY_ID', + 'CROWD_SNOWFLAKE_S3_SECRET_ACCESS_KEY', + ], + producer: { + enabled: false, + }, + temporal: { + enabled: true, + }, + redis: { + enabled: false, + }, +} + +const options: Options = { + postgres: { + enabled: true, + }, + opensearch: { + enabled: false, + }, +} + +export const svc = new ServiceWorker(config, options) diff --git a/services/apps/pcc_sync_worker/src/parser/index.ts b/services/apps/pcc_sync_worker/src/parser/index.ts new file mode 100644 index 0000000000..0e3d9a7617 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/parser/index.ts @@ -0,0 +1,8 @@ +export { parsePccRow } from './rowParser' +export type { + CdpHierarchyTarget, + MappingRule, + ParseResult, + ParsedPccProject, + PccParquetRow, +} from './types' diff --git a/services/apps/pcc_sync_worker/src/parser/rowParser.ts b/services/apps/pcc_sync_worker/src/parser/rowParser.ts new file mode 100644 index 0000000000..bf8af6336f --- /dev/null +++ b/services/apps/pcc_sync_worker/src/parser/rowParser.ts @@ -0,0 +1,200 @@ +/** + * PCC project row parser and hierarchy mapper. + * + * Transforms a group of raw Parquet rows (one per hierarchy level) for a single + * PCC leaf project into a structured ParsedPccProject, applying the CDP hierarchy + * mapping rules. + * + * Pure function — no DB access, no I/O. + * + * Mapping rules (effective_depth = max HIERARCHY_LEVEL - 1, stripping TLF root): + * Rule 1 (eff=1): group=level[1], project=level[1], subproject=level[1] + * Rule 2 (eff=2): group=level[2], project=level[1], subproject=level[1] + * Rule 3 (eff=3): group=level[3], project=level[2], subproject=level[1] + * Rule 4 (eff=4): group=level[3], project=level[2], subproject=level[1] (level[4] intermediate dropped) + * + * Effective depth > 4: SCHEMA_MISMATCH — surfaced to pcc_projects_sync_errors. + */ +import { SegmentStatus } from '@crowd/types' + +import type { MappingRule, ParseResult, PccParquetRow } from './types' + +/** + * PCC PROJECT_STATUS → CDP SegmentStatus enum. + */ +const STATUS_MAP: Record = { + Active: SegmentStatus.ACTIVE, + Archived: SegmentStatus.ARCHIVED, + 'Formation - Disengaged': SegmentStatus.FORMATION, + 'Formation - Engaged': SegmentStatus.FORMATION, + 'Formation - Exploratory': SegmentStatus.FORMATION, + 'Formation - On Hold': SegmentStatus.FORMATION, + Prospect: SegmentStatus.PROSPECT, +} + +/** + * Parquet serializes Snowflake NUMBER columns as fixed-width big-endian Buffers. + * Handle both the plain number case and the Buffer case. + */ +function parseParquetInt(value: unknown): number { + if (typeof value === 'number') return value + // Parquet serializes Snowflake NUMBER as a Node.js Buffer (big-endian bytes) + if (Buffer.isBuffer(value)) { + let result = 0 + for (const byte of value) { + result = result * 256 + byte + } + return result + } + return Number(value) +} + +/** + * Parse and validate all raw Parquet rows for a single PCC leaf project. + * + * Each call receives all rows that share the same PROJECT_ID (one row per + * hierarchy level from the PROJECT_SPINE JOIN). Returns ok=false with + * SCHEMA_MISMATCH if the group is malformed or has an unsupported depth (> 4). + */ +export function parsePccRow(rawRows: Record[]): ParseResult { + if (rawRows.length === 0) { + return { + ok: false, + errorType: 'SCHEMA_MISMATCH', + details: { reason: 'empty row group' }, + } + } + + // All rows share the same leaf-level fields — use the first row for them. + const firstRaw = rawRows[0] as Partial + const projectId = firstRaw.PROJECT_ID + const name = firstRaw.NAME + + if (!projectId || !name) { + return { + ok: false, + errorType: 'SCHEMA_MISMATCH', + details: { + reason: 'missing required fields', + missingFields: [...(!projectId ? ['PROJECT_ID'] : []), ...(!name ? ['NAME'] : [])], + }, + } + } + + // Parse HIERARCHY_LEVEL for each row and sort ascending (level=1 is the leaf). + const levelRows = rawRows + .map((r) => { + const row = r as Partial + return { + level: parseParquetInt(row.HIERARCHY_LEVEL), + name: (row.MAPPED_PROJECT_NAME ?? null) as string | null, + slug: (row.MAPPED_PROJECT_SLUG ?? null) as string | null, + } + }) + .sort((a, b) => a.level - b.level) + + const maxLevel = levelRows[levelRows.length - 1].level + const effectiveDepth = maxLevel - 1 + // Slug of the leaf project itself (hierarchy_level=1 row). + const leafSlug = levelRows[0]?.slug ?? null + + if (!Number.isFinite(effectiveDepth) || !Number.isInteger(effectiveDepth) || effectiveDepth < 1 || effectiveDepth > 4) { + return { + ok: false, + errorType: 'SCHEMA_MISMATCH', + pccProjectId: String(projectId), + pccSlug: leafSlug, + details: { + reason: + effectiveDepth < 1 + ? 'unexpected root node (maxHierarchyLevel≤1)' + : 'unsupported depth > 4', + maxLevel, + effectiveDepth, + projectId, + name, + }, + } + } + + // Build hierarchy_level → MAPPED_PROJECT_NAME lookup. + const nameAt: Record = {} + for (const row of levelRows) { + nameAt[row.level] = row.name + } + + const cdpTargetResult = buildCdpTarget(effectiveDepth as MappingRule, nameAt) + if (cdpTargetResult.ok === false) { + return { + ok: false, + errorType: 'SCHEMA_MISMATCH', + pccProjectId: String(projectId), + pccSlug: leafSlug, + details: { reason: cdpTargetResult.reason, maxLevel, effectiveDepth, projectId, name }, + } + } + + const rawStatus = firstRaw.PROJECT_STATUS ?? null + const mappedStatus = rawStatus ? (STATUS_MAP[String(rawStatus)] ?? null) : null + + return { + ok: true, + project: { + pccProjectId: String(projectId), + pccSlug: leafSlug, + name: String(name), + status: mappedStatus, + maturity: (firstRaw.PROJECT_MATURITY_LEVEL ?? null) as string | null, + description: (firstRaw.DESCRIPTION ?? null) as string | null, + logoUrl: (firstRaw.PROJECT_LOGO ?? null) as string | null, + segmentIdFromSnowflake: (firstRaw.SEGMENT_ID ?? null) as string | null, + effectiveDepth, + mappingRule: effectiveDepth as MappingRule, + cdpTarget: cdpTargetResult.target, + }, + } +} + +function buildCdpTarget( + effectiveDepth: MappingRule, + nameAt: Record, +): + | { ok: true; target: { group: string; project: string; subproject: string } } + | { ok: false; reason: string } { + switch (effectiveDepth) { + case 1: { + // TLF at level 2 (stripped), leaf at level 1 → all three CDP levels share the leaf. + const n1 = nameAt[1] + if (!n1) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=1' } + return { ok: true, target: { group: n1, project: n1, subproject: n1 } } + } + case 2: { + // TLF at level 3, group at level 2, leaf at level 1. + const n2 = nameAt[2] + const n1 = nameAt[1] + if (!n2) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=2' } + if (!n1) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=1' } + return { ok: true, target: { group: n2, project: n1, subproject: n1 } } + } + case 3: { + // TLF at level 4, group at level 3, project at level 2, leaf at level 1. + const n3 = nameAt[3] + const n2 = nameAt[2] + const n1 = nameAt[1] + if (!n3) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=3' } + if (!n2) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=2' } + if (!n1) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=1' } + return { ok: true, target: { group: n3, project: n2, subproject: n1 } } + } + case 4: { + // TLF at level 5, intermediate at level 4 (dropped), group at level 3, project at level 2, leaf at level 1. + const n3 = nameAt[3] + const n2 = nameAt[2] + const n1 = nameAt[1] + if (!n3) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=3' } + if (!n2) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=2' } + if (!n1) return { ok: false, reason: 'missing MAPPED_PROJECT_NAME at hierarchy_level=1' } + return { ok: true, target: { group: n3, project: n2, subproject: n1 } } + } + } +} diff --git a/services/apps/pcc_sync_worker/src/parser/types.ts b/services/apps/pcc_sync_worker/src/parser/types.ts new file mode 100644 index 0000000000..d6bb6d2559 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/parser/types.ts @@ -0,0 +1,76 @@ +/** + * Types for the PCC project parser. + * + * Parquet rows come from Snowflake COPY INTO with HEADER=TRUE, + * so all column names are uppercase. + */ + +/** + * Raw Parquet row from the PCC PROJECT_SPINE export. + * + * One row is emitted per (leaf project, hierarchy level). A leaf project at + * depth N produces N rows: hierarchy_level=1 is the leaf itself, + * hierarchy_level=N is the topmost ancestor. All rows for the same leaf share + * the same PROJECT_ID, NAME, PROJECT_STATUS, etc. + */ +export interface PccParquetRow { + PROJECT_ID: string + NAME: string + DESCRIPTION: string | null + PROJECT_LOGO: string | null + PROJECT_STATUS: string | null + PROJECT_MATURITY_LEVEL: string | null + /** ID of the ancestor at this hierarchy level (hierarchy_level=1 → leaf itself). */ + MAPPED_PROJECT_ID: string | null + /** Name of the ancestor at this hierarchy level. */ + MAPPED_PROJECT_NAME: string | null + /** Slug of the ancestor at this hierarchy level. */ + MAPPED_PROJECT_SLUG: string | null + /** 1 = leaf, N = topmost ancestor. */ + HIERARCHY_LEVEL: number + SEGMENT_ID: string | null +} + +/** + * CDP hierarchy target derived from PCC depth levels. + * Phase 1 only updates existing segments — all three levels refer to + * existing CDP segment names (group / project / subproject). + */ +export interface CdpHierarchyTarget { + group: string + project: string + subproject: string +} + +/** Which depth-mapping rule was applied (effective depth 1–4). */ +export type MappingRule = 1 | 2 | 3 | 4 + +/** Structured result after parsing and transforming a single Parquet row. */ +export interface ParsedPccProject { + pccProjectId: string + /** Raw PCC slug — used for step-3 segment matching in the consumer. */ + pccSlug: string | null + name: string + /** Mapped to CDP segmentsStatus_type enum value, or null if unknown. */ + status: string | null + maturity: string | null + description: string | null + logoUrl: string | null + /** segment_id from Snowflake ACTIVE_SEGMENTS JOIN — used for step-1 matching. */ + segmentIdFromSnowflake: string | null + effectiveDepth: number + mappingRule: MappingRule + cdpTarget: CdpHierarchyTarget +} + +export type ParseResult = + | { ok: true; project: ParsedPccProject } + | { + ok: false + errorType: 'SCHEMA_MISMATCH' + details: Record + /** Present when the row had a valid PROJECT_ID (depth-range errors). Used for segment lookup. */ + pccProjectId?: string + /** Present when the row had a valid SLUG (depth-range errors). Used for segment lookup. */ + pccSlug?: string | null + } diff --git a/services/apps/pcc_sync_worker/src/schedules/index.ts b/services/apps/pcc_sync_worker/src/schedules/index.ts new file mode 100644 index 0000000000..69a828b61e --- /dev/null +++ b/services/apps/pcc_sync_worker/src/schedules/index.ts @@ -0,0 +1,2 @@ +export { schedulePccS3Export } from './pccS3Export' +export { schedulePccS3Cleanup } from './pccS3Cleanup' diff --git a/services/apps/pcc_sync_worker/src/schedules/pccS3Cleanup.ts b/services/apps/pcc_sync_worker/src/schedules/pccS3Cleanup.ts new file mode 100644 index 0000000000..b5be4cc095 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/schedules/pccS3Cleanup.ts @@ -0,0 +1,46 @@ +import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' + +import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack' + +import { svc } from '../main' +import { pccS3CleanupScheduler } from '../workflows' + +export const schedulePccS3Cleanup = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'pcc-s3-cleanup', + spec: { + // Run at 03:00 every day + cronExpressions: ['00 3 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: pccS3CleanupScheduler, + taskQueue: 'pccSync', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }, + }) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('PCC cleanup schedule already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Failed to create pcc-s3-cleanup schedule') + sendSlackNotification( + SlackChannel.CDP_INTEGRATIONS_ALERTS, + SlackPersona.ERROR_REPORTER, + 'PCC S3 Cleanup Schedule Failed', + `Failed to create the \`pcc-s3-cleanup\` Temporal schedule.\n\n*Error:* ${err instanceof Error ? err.message : String(err)}`, + ) + } + } +} diff --git a/services/apps/pcc_sync_worker/src/schedules/pccS3Export.ts b/services/apps/pcc_sync_worker/src/schedules/pccS3Export.ts new file mode 100644 index 0000000000..8b5fbcaedf --- /dev/null +++ b/services/apps/pcc_sync_worker/src/schedules/pccS3Export.ts @@ -0,0 +1,46 @@ +import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' + +import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack' + +import { svc } from '../main' +import { pccS3ExportScheduler } from '../workflows' + +export const schedulePccS3Export = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'pcc-s3-export', + spec: { + // Run at 01:00 every day, after the snowflake connectors export at 00:20 + cronExpressions: ['00 1 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: pccS3ExportScheduler, + taskQueue: 'pccSync', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }, + }) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('PCC export schedule already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + svc.log.error({ err }, 'Failed to create pcc-s3-export schedule') + sendSlackNotification( + SlackChannel.CDP_INTEGRATIONS_ALERTS, + SlackPersona.ERROR_REPORTER, + 'PCC S3 Export Schedule Failed', + `Failed to create the \`pcc-s3-export\` Temporal schedule.\n\n*Error:* ${err instanceof Error ? err.message : String(err)}`, + ) + } + } +} diff --git a/services/apps/pcc_sync_worker/src/scripts/triggerCleanup.ts b/services/apps/pcc_sync_worker/src/scripts/triggerCleanup.ts new file mode 100644 index 0000000000..ef71f7aa8d --- /dev/null +++ b/services/apps/pcc_sync_worker/src/scripts/triggerCleanup.ts @@ -0,0 +1,26 @@ +import { TEMPORAL_CONFIG, getTemporalClient } from '../config/settings' + +async function main() { + const client = await getTemporalClient(TEMPORAL_CONFIG()) + + const workflowId = `pcc-cleanup/manual/${new Date().toISOString().slice(0, 19)}` + + await client.workflow.start('pccS3CleanupScheduler', { + taskQueue: 'pccSync', + workflowId, + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }) + + console.log(`PCC S3 cleanup workflow started: ${workflowId}`) + process.exit(0) +} + +main().catch((err) => { + console.error('Failed to trigger workflow:', err) + process.exit(1) +}) diff --git a/services/apps/pcc_sync_worker/src/scripts/triggerExport.ts b/services/apps/pcc_sync_worker/src/scripts/triggerExport.ts new file mode 100644 index 0000000000..cb3f1bc88c --- /dev/null +++ b/services/apps/pcc_sync_worker/src/scripts/triggerExport.ts @@ -0,0 +1,26 @@ +import { TEMPORAL_CONFIG, getTemporalClient } from '../config/settings' + +async function main() { + const client = await getTemporalClient(TEMPORAL_CONFIG()) + + const workflowId = `pcc-export/manual/${new Date().toISOString().slice(0, 19)}` + + await client.workflow.start('pccS3ExportScheduler', { + taskQueue: 'pccSync', + workflowId, + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }) + + console.log(`PCC S3 export workflow started: ${workflowId}`) + process.exit(0) +} + +main().catch((err) => { + console.error('Failed to trigger workflow:', err) + process.exit(1) +}) diff --git a/services/apps/pcc_sync_worker/src/workflows/cleanupWorkflow.ts b/services/apps/pcc_sync_worker/src/workflows/cleanupWorkflow.ts new file mode 100644 index 0000000000..1698af6368 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/workflows/cleanupWorkflow.ts @@ -0,0 +1,17 @@ +import { proxyActivities } from '@temporalio/workflow' + +import type * as activities from '../activities/cleanupActivity' + +const { executeCleanup } = proxyActivities({ + startToCloseTimeout: '1 hour', + retry: { + initialInterval: '2s', + backoffCoefficient: 2, + maximumInterval: '60s', + maximumAttempts: 3, + }, +}) + +export async function pccS3CleanupScheduler(): Promise { + await executeCleanup() +} diff --git a/services/apps/pcc_sync_worker/src/workflows/exportWorkflow.ts b/services/apps/pcc_sync_worker/src/workflows/exportWorkflow.ts new file mode 100644 index 0000000000..5719280bb4 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/workflows/exportWorkflow.ts @@ -0,0 +1,17 @@ +import { proxyActivities } from '@temporalio/workflow' + +import type * as activities from '../activities/exportActivity' + +const { executeExport } = proxyActivities({ + startToCloseTimeout: '1 hour', + retry: { + initialInterval: '2s', + backoffCoefficient: 2, + maximumInterval: '60s', + maximumAttempts: 3, + }, +}) + +export async function pccS3ExportScheduler(): Promise { + await executeExport() +} diff --git a/services/apps/pcc_sync_worker/src/workflows/index.ts b/services/apps/pcc_sync_worker/src/workflows/index.ts new file mode 100644 index 0000000000..9dd43218e4 --- /dev/null +++ b/services/apps/pcc_sync_worker/src/workflows/index.ts @@ -0,0 +1,2 @@ +export { pccS3ExportScheduler } from './exportWorkflow' +export { pccS3CleanupScheduler } from './cleanupWorkflow' diff --git a/services/apps/pcc_sync_worker/tsconfig.json b/services/apps/pcc_sync_worker/tsconfig.json new file mode 100644 index 0000000000..bf7f183850 --- /dev/null +++ b/services/apps/pcc_sync_worker/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../../base.tsconfig.json", + "include": ["src/**/*"] +} diff --git a/services/apps/snowflake_connectors/package.json b/services/apps/snowflake_connectors/package.json index 5f37b6662e..a321499447 100644 --- a/services/apps/snowflake_connectors/package.json +++ b/services/apps/snowflake_connectors/package.json @@ -27,8 +27,6 @@ "@crowd/snowflake": "workspace:*", "@crowd/temporal": "workspace:*", "@crowd/types": "workspace:*", - "@aws-sdk/client-s3": "^3.700.0", - "@dsnp/parquetjs": "^1.7.0", "@temporalio/client": "~1.11.8", "@temporalio/workflow": "~1.11.8", "tsx": "^4.7.1", diff --git a/services/apps/snowflake_connectors/src/activities/cleanupActivity.ts b/services/apps/snowflake_connectors/src/activities/cleanupActivity.ts index 373494a1fc..86124fa941 100644 --- a/services/apps/snowflake_connectors/src/activities/cleanupActivity.ts +++ b/services/apps/snowflake_connectors/src/activities/cleanupActivity.ts @@ -1,9 +1,9 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' import { getServiceChildLogger } from '@crowd/logging' import { SlackChannel, SlackPersona, sendSlackNotification } from '@crowd/slack' +import { MetadataStore, S3Service, buildPlatformFilter } from '@crowd/snowflake' -import { MetadataStore } from '../core/metadataStore' -import { S3Service } from '../core/s3Service' +import { getEnabledPlatforms } from '../integrations' const log = getServiceChildLogger('cleanupActivity') @@ -12,7 +12,11 @@ export async function executeCleanup(intervalHours = 24): Promise { const metadataStore = new MetadataStore(db) const s3Service = new S3Service() - const jobs = await metadataStore.getCleanableJobS3Paths(intervalHours) + const jobs = await metadataStore.getCleanableJobS3Paths( + intervalHours, + buildPlatformFilter(getEnabledPlatforms()), + true, + ) log.info({ jobCount: jobs.length, intervalHours }, 'Found cleanable jobs') for (const job of jobs) { diff --git a/services/apps/snowflake_connectors/src/activities/exportActivity.ts b/services/apps/snowflake_connectors/src/activities/exportActivity.ts index f4fca93164..75991d9397 100644 --- a/services/apps/snowflake_connectors/src/activities/exportActivity.ts +++ b/services/apps/snowflake_connectors/src/activities/exportActivity.ts @@ -6,10 +6,9 @@ */ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' import { getServiceChildLogger } from '@crowd/logging' +import { MetadataStore, SnowflakeExporter, buildS3FilenamePrefix } from '@crowd/snowflake' import { PlatformType } from '@crowd/types' -import { MetadataStore } from '../core/metadataStore' -import { SnowflakeExporter } from '../core/snowflakeExporter' import { getDataSourceNames as _getDataSourceNames, getEnabledPlatforms as _getEnabledPlatforms, @@ -27,18 +26,6 @@ export async function getDataSourceNamesForPlatform(platform: PlatformType): Pro const log = getServiceChildLogger('exportActivity') -function buildS3FilenamePrefix(platform: string, sourceName: string): string { - const now = new Date() - const year = now.getFullYear() - const month = String(now.getMonth() + 1).padStart(2, '0') - const day = String(now.getDate()).padStart(2, '0') - const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH - if (!s3BucketPath) { - throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH') - } - return `${s3BucketPath}/${platform}/${sourceName}/${year}/${month}/${day}` -} - export async function executeExport( platform: PlatformType, sourceName: DataSourceName, diff --git a/services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts b/services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts index 93289b8008..879c956485 100644 --- a/services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts +++ b/services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts @@ -9,11 +9,10 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' import { getServiceChildLogger } from '@crowd/logging' import { QUEUE_CONFIG, QueueFactory } from '@crowd/queue' import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis' +import { MetadataStore, S3Service, SnowflakeExportJob, buildPlatformFilter } from '@crowd/snowflake' import { PlatformType } from '@crowd/types' import { IntegrationResolver } from '../core/integrationResolver' -import { MetadataStore, SnowflakeExportJob } from '../core/metadataStore' -import { S3Service } from '../core/s3Service' import { getDataSource, getEnabledPlatforms } from '../integrations' const log = getServiceChildLogger('transformerConsumer') @@ -30,6 +29,7 @@ export class TransformerConsumer { private readonly integrationResolver: IntegrationResolver, private readonly emitter: DataSinkWorkerEmitter, private readonly pollingIntervalMs: number, + private readonly enabledPlatforms: string[], ) { this.currentPollingIntervalMs = pollingIntervalMs } @@ -40,7 +40,9 @@ export class TransformerConsumer { while (this.running) { try { - const job = await this.metadataStore.claimOldestPendingJob() + const job = await this.metadataStore.claimOldestPendingJob( + buildPlatformFilter(this.enabledPlatforms), + ) log.info('Claiming job from metadata store', { job }) if (job) { @@ -157,5 +159,14 @@ export async function createTransformerConsumer(): Promise const pollingIntervalMs = 10_000 // 10 seconds - return new TransformerConsumer(metadataStore, s3Service, resolver, emitter, pollingIntervalMs) + const enabledPlatforms = getEnabledPlatforms() as string[] + + return new TransformerConsumer( + metadataStore, + s3Service, + resolver, + emitter, + pollingIntervalMs, + enabledPlatforms, + ) } diff --git a/services/libs/snowflake/package.json b/services/libs/snowflake/package.json index caf9081de6..cc06c1b139 100644 --- a/services/libs/snowflake/package.json +++ b/services/libs/snowflake/package.json @@ -13,7 +13,10 @@ "tsx": "^4.7.1" }, "dependencies": { + "@aws-sdk/client-s3": "^3.700.0", + "@crowd/database": "workspace:*", "@crowd/logging": "workspace:*", + "@dsnp/parquetjs": "^1.7.0", "snowflake-sdk": "^2.3.3" } } diff --git a/services/libs/snowflake/src/index.ts b/services/libs/snowflake/src/index.ts index 025482c309..174071f858 100644 --- a/services/libs/snowflake/src/index.ts +++ b/services/libs/snowflake/src/index.ts @@ -1,3 +1,6 @@ export * from './client' export * from './github' +export * from './metadataStore' +export * from './s3Service' +export * from './snowflakeExporter' export * from './types' diff --git a/services/apps/snowflake_connectors/src/core/metadataStore.ts b/services/libs/snowflake/src/metadataStore.ts similarity index 70% rename from services/apps/snowflake_connectors/src/core/metadataStore.ts rename to services/libs/snowflake/src/metadataStore.ts index 91fe1c16f0..eb5fb8bf19 100644 --- a/services/apps/snowflake_connectors/src/core/metadataStore.ts +++ b/services/libs/snowflake/src/metadataStore.ts @@ -31,6 +31,27 @@ export interface SnowflakeExportJob { metrics: JobMetrics | null } +export interface PlatformFilter { + clause: string + params: Record +} + +/** + * Build a SQL platform filter for use in metadataStore queries. + * + * An empty array returns `AND FALSE` (matches nothing), preventing + * accidental full-table scans when no platforms are configured. + */ +export function buildPlatformFilter(platforms: string[]): PlatformFilter { + if (platforms.length === 0) { + return { clause: 'AND FALSE', params: {} } + } + return { + clause: 'AND platform = ANY($(platforms)::text[])', + params: { platforms }, + } +} + export class MetadataStore { constructor(private readonly db: DbConnection) {} @@ -45,7 +66,7 @@ export class MetadataStore { const metrics: JobMetrics = { exportedRows: totalRows, exportedBytes: totalBytes } await this.db.none( `INSERT INTO integration."snowflakeExportJobs" (platform, "sourceName", s3_path, "exportStartedAt", metrics) - VALUES ($1, $2, $3, $4, $5::jsonb) + VALUES ($(platform), $(sourceName), $(s3Path), $(exportStartedAt), $(metrics)::jsonb) ON CONFLICT (s3_path) DO UPDATE SET "exportStartedAt" = EXCLUDED."exportStartedAt", "processingStartedAt" = NULL, @@ -54,7 +75,7 @@ export class MetadataStore { error = NULL, metrics = EXCLUDED.metrics, "updatedAt" = NOW()`, - [platform, sourceName, s3Path, exportStartedAt, JSON.stringify(metrics)], + { platform, sourceName, s3Path, exportStartedAt, metrics: JSON.stringify(metrics) }, ) } @@ -62,7 +83,9 @@ export class MetadataStore { * Atomically claim the oldest pending job by setting processingStartedAt. * Uses FOR UPDATE SKIP LOCKED so concurrent consumers never pick the same row. */ - async claimOldestPendingJob(): Promise { + async claimOldestPendingJob(filter?: PlatformFilter): Promise { + const platformFilter = filter?.clause ?? '' + const params: Record = filter?.params ?? {} const row = await this.db.oneOrNone<{ id: number platform: string @@ -82,17 +105,28 @@ export class MetadataStore { WHERE id = ( SELECT id FROM integration."snowflakeExportJobs" WHERE "processingStartedAt" IS NULL + ${platformFilter} ORDER BY "createdAt" ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, platform, "sourceName", s3_path, "exportStartedAt", "createdAt", "updatedAt", "processingStartedAt", "completedAt", "cleanedAt", error, metrics`, + params, ) return row ? mapRowToJob(row) : null } - async getCleanableJobS3Paths(intervalHours = 24): Promise<{ id: number; s3Path: string }[]> { + async getCleanableJobS3Paths( + intervalHours = 24, + filter?: PlatformFilter, + requireZeroSkipped = true, + ): Promise<{ id: number; s3Path: string }[]> { + const platformFilter = filter?.clause ?? '' + const params: Record = { intervalHours, ...filter?.params } + const skippedFilter = requireZeroSkipped + ? `AND metrics ? 'skippedCount' AND (metrics->>'skippedCount')::int = 0` + : '' const rows = await this.db.manyOrNone<{ id: number; s3_path: string }>( `SELECT id, s3_path FROM integration."snowflakeExportJobs" @@ -100,11 +134,11 @@ export class MetadataStore { AND "cleanedAt" IS NULL AND error IS NULL AND metrics IS NOT NULL - AND metrics ? 'skippedCount' - AND (metrics->>'skippedCount')::int = 0 - AND "completedAt" <= NOW() - make_interval(hours => $1) + ${skippedFilter} + AND "completedAt" <= NOW() - make_interval(hours => $(intervalHours)) + ${platformFilter} ORDER BY "completedAt" ASC`, - [intervalHours], + params, ) return rows.map((r) => ({ id: r.id, s3Path: r.s3_path })) } @@ -113,8 +147,8 @@ export class MetadataStore { await this.db.none( `UPDATE integration."snowflakeExportJobs" SET "cleanedAt" = NOW(), "updatedAt" = NOW() - WHERE id = $1`, - [jobId], + WHERE id = $(jobId)`, + { jobId }, ) } @@ -122,21 +156,21 @@ export class MetadataStore { await this.db.none( `UPDATE integration."snowflakeExportJobs" SET "completedAt" = NOW(), - metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($2::jsonb, '{}'::jsonb), + metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($(metrics)::jsonb, '{}'::jsonb), "updatedAt" = NOW() - WHERE id = $1`, - [jobId, metrics ? JSON.stringify(metrics) : null], + WHERE id = $(jobId)`, + { jobId, metrics: metrics ? JSON.stringify(metrics) : null }, ) } async markFailed(jobId: number, error: string, metrics?: Partial): Promise { await this.db.none( `UPDATE integration."snowflakeExportJobs" - SET error = $2, "completedAt" = NOW(), - metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($3::jsonb, '{}'::jsonb), + SET error = $(error), "completedAt" = NOW(), + metrics = COALESCE(metrics, '{}'::jsonb) || COALESCE($(metrics)::jsonb, '{}'::jsonb), "updatedAt" = NOW() - WHERE id = $1`, - [jobId, error, metrics ? JSON.stringify(metrics) : null], + WHERE id = $(jobId)`, + { jobId, error, metrics: metrics ? JSON.stringify(metrics) : null }, ) } @@ -144,11 +178,11 @@ export class MetadataStore { const row = await this.db.oneOrNone<{ max: Date | null }>( `SELECT MAX("exportStartedAt") AS max FROM integration."snowflakeExportJobs" - WHERE platform = $1 - AND "sourceName" = $2 + WHERE platform = $(platform) + AND "sourceName" = $(sourceName) AND "completedAt" IS NOT NULL AND error IS NULL`, - [platform, sourceName], + { platform, sourceName }, ) return row?.max ?? null } diff --git a/services/apps/snowflake_connectors/src/core/s3Service.ts b/services/libs/snowflake/src/s3Service.ts similarity index 100% rename from services/apps/snowflake_connectors/src/core/s3Service.ts rename to services/libs/snowflake/src/s3Service.ts diff --git a/services/apps/snowflake_connectors/src/core/snowflakeExporter.ts b/services/libs/snowflake/src/snowflakeExporter.ts similarity index 80% rename from services/apps/snowflake_connectors/src/core/snowflakeExporter.ts rename to services/libs/snowflake/src/snowflakeExporter.ts index c210bbfc81..8f1dc7ac9c 100644 --- a/services/apps/snowflake_connectors/src/core/snowflakeExporter.ts +++ b/services/libs/snowflake/src/snowflakeExporter.ts @@ -5,7 +5,8 @@ * to export data into S3 as Parquet files. */ import { getServiceChildLogger } from '@crowd/logging' -import { SnowflakeClient } from '@crowd/snowflake' + +import { SnowflakeClient } from './client' const log = getServiceChildLogger('snowflakeExporter') @@ -24,6 +25,22 @@ interface CopyIntoRow { fileName: string } +/** + * Build the S3 filename prefix for a batched COPY INTO export. + * Format: {CROWD_SNOWFLAKE_S3_BUCKET_PATH}/{platform}/{sourceName}/{yyyy}/{mm}/{dd} + */ +export function buildS3FilenamePrefix(platform: string, sourceName: string): string { + const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH + if (!s3BucketPath) { + throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH') + } + const now = new Date() + const year = now.getFullYear() + const month = String(now.getMonth() + 1).padStart(2, '0') + const day = String(now.getDate()).padStart(2, '0') + return `${s3BucketPath}/${platform}/${sourceName}/${year}/${month}/${day}` +} + export class SnowflakeExporter { private readonly snowflake: SnowflakeClient