|
| 1 | +/** |
| 2 | + * Export activity: Execute PCC recursive CTE COPY INTO + write metadata. |
| 3 | + * |
| 4 | + * Full daily export of ANALYTICS.SILVER_DIM.PROJECTS via recursive CTE. |
| 5 | + * No incremental logic — at ~1,538 leaf rows, a full daily export is simpler |
| 6 | + * and more reliable than incremental (a parent name change would require |
| 7 | + * re-exporting all descendants). |
| 8 | + */ |
| 9 | +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database' |
| 10 | +import { getServiceChildLogger } from '@crowd/logging' |
| 11 | +import { MetadataStore, SnowflakeExporter } from '@crowd/snowflake' |
| 12 | + |
| 13 | +const log = getServiceChildLogger('exportActivity') |
| 14 | + |
| 15 | +const PLATFORM = 'pcc' |
| 16 | +const SOURCE_NAME = 'project-hierarchy' |
| 17 | + |
| 18 | +function buildSourceQuery(): string { |
| 19 | + return ` |
| 20 | + WITH RECURSIVE project_hierarchy AS ( |
| 21 | + SELECT project_id, name, description, project_logo, project_status, |
| 22 | + project_maturity_level, repository_url, slug, parent_id, |
| 23 | + 1 AS depth, |
| 24 | + name AS depth_1, NULL::VARCHAR AS depth_2, NULL::VARCHAR AS depth_3, |
| 25 | + NULL::VARCHAR AS depth_4, NULL::VARCHAR AS depth_5 |
| 26 | + FROM ANALYTICS.SILVER_DIM.PROJECTS |
| 27 | + WHERE parent_id IS NULL |
| 28 | + UNION ALL |
| 29 | + SELECT p.project_id, p.name, p.description, p.project_logo, p.project_status, |
| 30 | + p.project_maturity_level, p.repository_url, p.slug, p.parent_id, |
| 31 | + h.depth + 1, |
| 32 | + h.depth_1, |
| 33 | + CASE WHEN h.depth + 1 = 2 THEN p.name ELSE h.depth_2 END, |
| 34 | + CASE WHEN h.depth + 1 = 3 THEN p.name ELSE h.depth_3 END, |
| 35 | + CASE WHEN h.depth + 1 = 4 THEN p.name ELSE h.depth_4 END, |
| 36 | + CASE WHEN h.depth + 1 = 5 THEN p.name ELSE h.depth_5 END |
| 37 | + FROM ANALYTICS.SILVER_DIM.PROJECTS p |
| 38 | + INNER JOIN project_hierarchy h ON p.parent_id = h.project_id |
| 39 | + ) |
| 40 | + SELECT ph.project_id, ph.name, ph.slug, ph.description, ph.project_logo, ph.repository_url, |
| 41 | + ph.project_status, ph.project_maturity_level, ph.depth, |
| 42 | + ph.depth_1, ph.depth_2, ph.depth_3, ph.depth_4, ph.depth_5, |
| 43 | + s.segment_id |
| 44 | + FROM project_hierarchy ph |
| 45 | + LEFT JOIN ANALYTICS.SILVER_DIM.ACTIVE_SEGMENTS s |
| 46 | + ON s.source_id = ph.project_id AND s.project_type = 'subproject' |
| 47 | + WHERE ph.project_id NOT IN ( |
| 48 | + SELECT DISTINCT parent_id FROM ANALYTICS.SILVER_DIM.PROJECTS |
| 49 | + WHERE parent_id IS NOT NULL |
| 50 | + ) |
| 51 | + ` |
| 52 | +} |
| 53 | + |
| 54 | +function buildS3FilenamePrefix(): string { |
| 55 | + const now = new Date() |
| 56 | + const year = now.getFullYear() |
| 57 | + const month = String(now.getMonth() + 1).padStart(2, '0') |
| 58 | + const day = String(now.getDate()).padStart(2, '0') |
| 59 | + const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH |
| 60 | + if (!s3BucketPath) { |
| 61 | + throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH') |
| 62 | + } |
| 63 | + return `${s3BucketPath}/${PLATFORM}/${SOURCE_NAME}/${year}/${month}/${day}` |
| 64 | +} |
| 65 | + |
| 66 | +export async function executeExport(): Promise<void> { |
| 67 | + log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'Starting PCC export') |
| 68 | + |
| 69 | + const exporter = new SnowflakeExporter() |
| 70 | + const db = await getDbConnection(WRITE_DB_CONFIG()) |
| 71 | + |
| 72 | + try { |
| 73 | + const metadataStore = new MetadataStore(db) |
| 74 | + const sourceQuery = buildSourceQuery() |
| 75 | + const s3FilenamePrefix = buildS3FilenamePrefix() |
| 76 | + const exportStartedAt = new Date() |
| 77 | + |
| 78 | + const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => { |
| 79 | + await metadataStore.insertExportJob( |
| 80 | + PLATFORM, |
| 81 | + SOURCE_NAME, |
| 82 | + s3Path, |
| 83 | + totalRows, |
| 84 | + totalBytes, |
| 85 | + exportStartedAt, |
| 86 | + ) |
| 87 | + } |
| 88 | + |
| 89 | + await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete) |
| 90 | + |
| 91 | + log.info({ platform: PLATFORM, sourceName: SOURCE_NAME }, 'PCC export completed') |
| 92 | + } catch (err) { |
| 93 | + log.error({ platform: PLATFORM, sourceName: SOURCE_NAME, err }, 'PCC export failed') |
| 94 | + throw err |
| 95 | + } finally { |
| 96 | + await exporter |
| 97 | + .destroy() |
| 98 | + .catch((err) => log.warn({ err }, 'Failed to close Snowflake connection')) |
| 99 | + } |
| 100 | +} |
0 commit comments