diff --git a/conf/base/catalog.yml b/conf/base/catalog.yml index c57ea84..2a60c25 100644 --- a/conf/base/catalog.yml +++ b/conf/base/catalog.yml @@ -1,52 +1,66 @@ -pypi_kedro_downloads: - type: kedro_datasets.snowflake.SnowparkTableDataset - table_name: "V_PYPI_KEDRO_DOWNLOADS" - database: "KEDRO_BI_DB" # Snowflake database - schema: "PYPI" # Schema inside the database - credentials: snowflake_credentials +heap_project_statistics: + type: kedro_datasets.ibis.TableDataset + table_name: KEDRO_PROJECT_STATISTICS + database: HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP + connection: + backend: snowflake + credentials: heap_snowflake_credentials -# Save results locally as CSV -pypi_kedro_downloads_table: - type: pandas.CSVDataset - filepath: data/02_intermediate/pypi_kedro_downloads.csv - save_args: - index: False +heap_any_command_run: + type: kedro_datasets.ibis.TableDataset + table_name: ANY_COMMAND_RUN + database: HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP + connection: + backend: snowflake + credentials: heap_snowflake_credentials new_kedro_users_monthly: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/new_kedro_users_monthly.csv - save_args: - index: False + file_format: csv mau_kedro: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/mau_kedro.csv - save_args: - index: False + file_format: csv kedro_plugins_mau: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/kedro_plugins_mau.csv + file_format: csv kedro_commands_mau: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/kedro_commands_mau.csv + file_format: csv + +pypi_kedro_downloads: + type: kedro_datasets.ibis.TableDataset + table_name: V_PYPI_KEDRO_DOWNLOADS + database: KEDRO_BI_DB.PYPI + connection: + backend: snowflake + credentials: snowflake_credentials + +pypi_kedro_downloads_table: + type: kedro_datasets.ibis.FileDataset + filepath: data/02_intermediate/pypi_kedro_downloads.csv + file_format: csv cohort_retention: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/cohort_retention.csv - save_args: - index: False + file_format: csv downloads_by_country: - type: kedro_datasets.snowflake.SnowparkTableDataset - table_name: "V_DOWNLOADS_BY_COUNTRY" - database: "KEDRO_BI_DB" - schema: "PYPI" + type: kedro_datasets.ibis.TableDataset + table_name: V_DOWNLOADS_BY_COUNTRY + database: KEDRO_BI_DB.PYPI + connection: + backend: snowflake credentials: snowflake_credentials downloads_by_country_table: - type: pandas.CSVDataset + type: kedro_datasets.ibis.FileDataset filepath: data/02_intermediate/downloads_by_country.csv - save_args: - index: False \ No newline at end of file + file_format: csv diff --git a/conf/base/credentials.yml b/conf/base/credentials.yml index 5f9d037..219d08c 100644 --- a/conf/base/credentials.yml +++ b/conf/base/credentials.yml @@ -5,3 +5,10 @@ snowflake_credentials: password: ${oc.env:SNOWFLAKE_PASSWORD} warehouse: "KEDRO_BI_WH_WH" +heap_snowflake_credentials: + account: ${oc.env:SNOWFLAKE_ACCOUNT} + user: ${oc.env:SNOWFLAKE_USER} + password: ${oc.env:SNOWFLAKE_PASSWORD} + role: HEAP_NTD_KEDRO + warehouse: HEAP_NTD_KEDRO_WH + diff --git a/conf/base/parameters_telemetry_data.yml b/conf/base/parameters_telemetry_data.yml index 4d9194b..1a0f77a 100644 --- a/conf/base/parameters_telemetry_data.yml +++ b/conf/base/parameters_telemetry_data.yml @@ -1,5 +1,24 @@ -# This is a boilerplate parameters config generated for pipeline 'telemetry_data' -# using Kedro 1.0.0. -# -# Documentation for this file format can be found in "Parameters" -# Link: https://docs.kedro.org/en/1.0.0/configuration/parameters.html +cohort_start_month: "2024-11" +cohort_trailing_hide_months: 2 + +plugins: + - "kedro mlflow" + - "kedro docker" + - "kedro airflow" + - "kedro databricks" + - "kedro azureml" + - "kedro vertexai" + - "kedro gql" + - "kedro boot" + - "kedro sagemaker" + - "kedro coda" + - "kedro kubeflow" + +commands: + - "kedro run" + - "kedro viz" + - "kedro new" + - "kedro pipeline" + - "kedro jupyter" + - "kedro ipython" + - "kedro package" diff --git a/requirements.txt b/requirements.txt index 1031de4..adb12d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,4 @@ ipython>=8.10 jupyterlab>=3.0 notebook kedro~=1.0.0 -kedro-datasets[snowflake] -pandas -pyarrow \ No newline at end of file +kedro-datasets[ibis-snowflake] diff --git a/src/kedro_pycafe_data/pipelines/data_transfer/nodes.py b/src/kedro_pycafe_data/pipelines/data_transfer/nodes.py index c00c13e..5525942 100644 --- a/src/kedro_pycafe_data/pipelines/data_transfer/nodes.py +++ b/src/kedro_pycafe_data/pipelines/data_transfer/nodes.py @@ -1,4 +1,5 @@ -def fetch_and_save(snowpark_df): - """Simply returns data read from Snowflake so Kedro saves it as CSV.""" - pdf = snowpark_df.to_pandas() - return pdf \ No newline at end of file +import ibis.expr.types as ir + + +def identity(tbl: ir.Table) -> ir.Table: + return tbl diff --git a/src/kedro_pycafe_data/pipelines/data_transfer/pipeline.py b/src/kedro_pycafe_data/pipelines/data_transfer/pipeline.py index 43636ca..9fdc181 100644 --- a/src/kedro_pycafe_data/pipelines/data_transfer/pipeline.py +++ b/src/kedro_pycafe_data/pipelines/data_transfer/pipeline.py @@ -1,25 +1,22 @@ -""" -This is a boilerplate pipeline 'data_transfer' -generated using Kedro 1.0.0 -""" +from kedro.pipeline import Node, Pipeline + +from .nodes import identity -from kedro.pipeline import Node, Pipeline # noqa -from .nodes import fetch_and_save def create_pipeline(**kwargs) -> Pipeline: return Pipeline( [ Node( - func=fetch_and_save, + identity, inputs="pypi_kedro_downloads", outputs="pypi_kedro_downloads_table", name="fetch_and_save_snowflake_data", ), Node( - func=fetch_and_save, + identity, inputs="downloads_by_country", outputs="downloads_by_country_table", name="fetch_and_save_downloads_by_country", ), ] - ) \ No newline at end of file + ) diff --git a/src/kedro_pycafe_data/pipelines/telemetry_data/nodes.py b/src/kedro_pycafe_data/pipelines/telemetry_data/nodes.py index 82b3287..24309cf 100644 --- a/src/kedro_pycafe_data/pipelines/telemetry_data/nodes.py +++ b/src/kedro_pycafe_data/pipelines/telemetry_data/nodes.py @@ -1,258 +1,175 @@ -import os -from typing import Tuple -from snowflake.snowpark import Session -import pandas as pd - -# Earliest cohort month included in the cohort retention analysis. Anything -# before this date is excluded so the heatmap focuses on cohorts with full -# Heap telemetry coverage. -COHORT_START_MONTH = "2024-11" - -# How many of the most-recent monthly cohorts to suppress from the cohort -# retention output. Recent cohorts have an unstable `cohort_size` because -# late-qualifiers (users who eventually exceed the >8-day activity span) keep -# joining the cohort retroactively. Two months gives those denominators time -# to settle. -COHORT_TRAILING_HIDE_MONTHS = 2 - - -def build_telemetry_data() -> Tuple[ - pd.DataFrame, # new_users_df -> new_kedro_users_monthly.csv - pd.DataFrame, # mau_df -> mau_kedro.csv - pd.DataFrame, # plugins_mau_df -> kedro_plugins_mau.csv - pd.DataFrame, # commands_mau_df -> kedro_commands_mau.csv - pd.DataFrame, # cohort_retention_df -> cohort_retention.csv -]: - """Run the Snowflake telemetry queries and return five dataframes. - - Pipeline outline: - Step 1-5 Build qualified-user temp tables (>8 days of non-CI activity - on a real release version of Kedro). - Step 6 Cohort retention matrix in long format — one row per - (cohort_month, month_offset) including explicit zero-retention - cells; latest two cohorts suppressed for stability. - Final New users, MAU, plugin MAU and core-command MAU dataframes. - """ - - # Add hardcoded DB and schema - connection_params = { - "account": os.getenv("SNOWFLAKE_ACCOUNT"), - "user": os.getenv("SNOWFLAKE_USER"), - "password": os.getenv("SNOWFLAKE_PASSWORD"), - "role": "HEAP_NTD_KEDRO", - "warehouse": "HEAP_NTD_KEDRO_WH", - "database": "DEMO_DB", - "schema": "PUBLIC", - } - - session = Session.builder.configs(connection_params).create() - - # --- Step 1 --- - # The RLIKE predicate below keeps only rows whose PROJECT_VERSION looks - # like a real release number (starts with `.`, e.g. 0.19, - # 1.2.6, 1.10.0). It drops placeholder strings like "test", "dev", "main" - - session.sql(""" - CREATE OR REPLACE TEMPORARY TABLE temp_dt_username AS - SELECT DATE(time) AS dt, - username, - MAX(LEFT(PROJECT_VERSION, 4)) AS max_version_prefix, - COUNT(*) AS cnt - FROM HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP.KEDRO_PROJECT_STATISTICS - WHERE DATE(time) >= '2024-09-01' - AND (is_ci_env IS NULL OR is_ci_env = 'false') - AND PROJECT_VERSION RLIKE '^[0-9]+[.][0-9].*$' - GROUP BY 1, 2 - """).collect() - - # --- Step 2 --- - session.sql(""" - CREATE OR REPLACE TEMPORARY TABLE temp_username_min_max_dt AS - SELECT username, - MIN(dt) AS min_dt, - MAX(dt) AS max_dt - FROM temp_dt_username - GROUP BY 1 - """).collect() - - # --- Step 3 --- - session.sql(""" - CREATE OR REPLACE TEMPORARY TABLE temp_username_uniques AS - SELECT username - FROM temp_username_min_max_dt - WHERE DATEDIFF('day', min_dt, max_dt) > 8 - """).collect() - - # --- Step 4 --- - session.sql(""" - CREATE OR REPLACE TEMPORARY TABLE temp_dt_username_unique AS - SELECT a.* - FROM temp_dt_username a - JOIN temp_username_uniques b - ON a.username = b.username - AND b.username IS NOT NULL - """).collect() - - # --- Step 5 --- here I took MIN(max_version_prefix) instead of MAX to get the version that was on the first date - session.sql(""" - CREATE OR REPLACE TEMPORARY TABLE temp_dt_username_unique_first_date AS - SELECT username, MIN(dt) AS first_date, MIN(max_version_prefix) as max_version_prefix - FROM temp_dt_username_unique - GROUP BY username - ORDER BY first_date - """).collect() - - # --- Step 6: cohort retention --- - # Definition: cohort = qualified users (>8 day activity span) whose first - # observed activity falls in a given month. For each cohort we report the - # number of users active 0..12 months later. - # - # Two correctness measures applied here: - # 1. The latest `COHORT_TRAILING_HIDE_MONTHS` cohorts are filtered out: - # their cohort_size is unstable while late-qualifiers backfill. - # 2. A rectangular (cohort x offset) grid is generated up to the - # current month and LEFT JOINed against activity, so cells with - # genuine zero retention are recorded as `active_users = 0` rather - # than silently dropped (which previously made them indistinguishable - # from "future month, no data yet"). - cohort_retention_df = session.sql(f""" - WITH cohort AS ( - SELECT username, - DATE_TRUNC('MONTH', first_date) AS cohort_month - FROM temp_dt_username_unique_first_date - WHERE TO_CHAR(first_date, 'YYYY-MM') >= '{COHORT_START_MONTH}' - AND DATE_TRUNC('MONTH', first_date) <= DATE_TRUNC( - 'MONTH', - DATEADD('month', -{COHORT_TRAILING_HIDE_MONTHS}, CURRENT_DATE()) - ) - ), - activity AS ( - SELECT DISTINCT username, - DATE_TRUNC('MONTH', dt) AS active_month - FROM temp_dt_username_unique - ), - sizes AS ( - SELECT cohort_month, COUNT(*) AS cohort_size - FROM cohort - GROUP BY 1 - ), - offsets AS ( - -- Rectangular skeleton: one row per (cohort_month, offset 0..12), - -- truncated to offsets that have already elapsed. - SELECT s.cohort_month, - off.value::INT AS month_offset - FROM sizes s, - LATERAL FLATTEN(input => ARRAY_GENERATE_RANGE(0, 13)) off - WHERE DATEADD('month', off.value::INT, s.cohort_month) - <= DATE_TRUNC('MONTH', CURRENT_DATE()) +import ibis +import ibis.expr.types as ir + + +def aggregate_project_stats(heap_stats: ir.Table) -> ir.Table: + """Aggregate raw events to one row per (username, day) since 2024-09-01.""" + return ( + heap_stats + .filter([ + heap_stats.time.date() >= ibis.date("2024-09-01"), + heap_stats.is_ci_env.isnull() | (heap_stats.is_ci_env == "false"), + # Keep only real release versions (e.g. 0.19, 1.2.6); drops "test", "dev", "main" + heap_stats.project_version.rlike(r"^[0-9]+[.][0-9].*$"), + ]) + .group_by(["username", heap_stats.time.date().name("dt")]) + .agg(max_version_prefix=heap_stats.project_version.left(4).max()) + ) + + +def get_unique_users(dt_username: ir.Table) -> ir.Table: + """Filter to users whose activity spans more than 8 days (max_dt - min_dt > 8).""" + min_max = dt_username.group_by("username").agg( + min_dt=dt_username.dt.min(), max_dt=dt_username.dt.max() + ) + return ( + min_max + .filter((min_max.max_dt - min_max.min_dt) > ibis.interval(days=8)) + .select("username") + ) + + +def get_active_events(dt_username: ir.Table, unique_users: ir.Table) -> ir.Table: + """Filter aggregated events to the unique-user list.""" + return dt_username.join(unique_users, "username")[dt_username.columns] + + +def build_new_users_monthly(active_events: ir.Table) -> ir.Table: + """Count new Kedro users per month (first seen since 2024-11).""" + first_dates = ( + active_events.group_by("username") + .agg( + first_date=active_events.dt.min(), + max_version_prefix=active_events.max_version_prefix.min(), ) - SELECT TO_CHAR(o.cohort_month, 'YYYY-MM') AS cohort_month, - o.month_offset AS month_offset, - COUNT(DISTINCT a.username) AS active_users, - MAX(s.cohort_size) AS cohort_size, - ROUND( - 100.0 * COUNT(DISTINCT a.username) - / NULLIF(MAX(s.cohort_size), 0), - 2 - ) AS retention_pct - FROM offsets o - JOIN sizes s ON o.cohort_month = s.cohort_month - LEFT JOIN cohort c ON o.cohort_month = c.cohort_month - LEFT JOIN activity a - ON c.username = a.username - AND DATEDIFF('month', o.cohort_month, a.active_month) = o.month_offset - GROUP BY 1, 2 - ORDER BY 1, 2 - """).to_pandas() - - # --- Final result 1: new Kedro users --- - new_users_df = session.sql(""" - SELECT first_year_month, max_version_prefix, COUNT(*) AS count - FROM ( - SELECT *, - TO_CHAR(first_date, 'YYYY-MM') AS first_year_month - FROM temp_dt_username_unique_first_date - ) t - WHERE first_year_month >= '2024-11' - GROUP BY 1, 2 - ORDER BY 1, 2 - """).to_pandas() - - # --- Final result 2: monthly active users (MAU) --- - mau_df = session.sql(""" - SELECT - TO_CHAR(DATE_TRUNC('month', dt), 'YYYY-MM') AS year_month, - max_version_prefix, - COUNT(DISTINCT username) AS mau - FROM temp_dt_username_unique - WHERE dt >= '2024-10-01' - GROUP BY year_month, max_version_prefix - ORDER BY year_month, max_version_prefix - """).to_pandas() - - # --- 3️⃣ Kedro plugins MAU --- - plugins_mau_df = session.sql(""" - SELECT - TO_CHAR(DATE_TRUNC('month', time), 'YYYY-MM') AS year_month, - first_two_words, - COUNT(DISTINCT username) AS unique_users - FROM ( - SELECT - a.command, - SPLIT(command, ' ')[0] || ' ' || SPLIT(command, ' ')[1] AS first_two_words, - b.username, - a.time - FROM HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP.ANY_COMMAND_RUN a - JOIN temp_username_uniques b - ON a.username = b.username - WHERE time >= '2024-10-01' - ) t - WHERE first_two_words IN ( - 'kedro mlflow', - 'kedro docker', - 'kedro airflow', - 'kedro databricks', - 'kedro azureml', - 'kedro vertexai', - 'kedro gql', - 'kedro boot', - 'kedro sagemaker', - 'kedro coda', - 'kedro kubeflow' + ) + return ( + first_dates + .mutate(first_year_month=first_dates.first_date.strftime("%Y-%m")) + .filter(lambda t: t.first_year_month >= "2024-11") + .group_by(["first_year_month", "max_version_prefix"]) + .agg(count=ibis._.count()) + .order_by(["first_year_month", "max_version_prefix"]) + .rename(str.upper) + ) + + +def build_mau(active_events: ir.Table) -> ir.Table: + """Count monthly active unique users by version since 2024-10.""" + return ( + active_events + .filter(active_events.dt >= ibis.date("2024-10-01")) + .mutate(year_month=active_events.dt.truncate("M").strftime("%Y-%m")) + .group_by(["year_month", "max_version_prefix"]) + .agg(mau=active_events.username.nunique()) + .order_by(["year_month", "max_version_prefix"]) + .rename(str.upper) + ) + + +def build_cohort_retention( + active_events: ir.Table, + cohort_start_month: str, + cohort_trailing_hide_months: int, +) -> ir.Table: + """Cohort × month retention matrix; zero-retention cells are explicit, not missing.""" + first_dates = ( + active_events.group_by("username") + .agg(first_date=active_events.dt.min()) + .mutate(cohort_month=lambda t: t.first_date.truncate("M")) + ) + + cutoff_month = ( + ibis.now().date().truncate("M") + - ibis.interval(months=cohort_trailing_hide_months) + ) + + cohort = ( + first_dates + .filter([ + first_dates.cohort_month.strftime("%Y-%m") >= cohort_start_month, + first_dates.cohort_month <= cutoff_month, + ]) + .select("username", "cohort_month") + ) + + # Rename to disambiguate after left join: active_username is NULL for non-active rows, + # which makes COUNT(DISTINCT active_username) naturally give 0 for those cells. + activity = ( + active_events + .select("username", active_month=active_events.dt.truncate("M")) + .distinct() + .rename(active_username="username") + ) + + sizes = cohort.group_by("cohort_month").agg(cohort_size=ibis._.count()) + + # Rectangular (cohort × month offset) grid, clipped to elapsed months. + # ibis.range(0, 13).unnest() compiles to LATERAL FLATTEN(ARRAY_GENERATE_RANGE(0, 13)) + # on Snowflake, producing one row per offset 0–12 per cohort month. + today = ibis.now().date() + offsets = ( + sizes + .mutate(month_offset=ibis.range(0, 13)) + .unnest("month_offset") + .filter( + lambda t: ( + (today.year() - t.cohort_month.year()) * 12 + + (today.month() - t.cohort_month.month()) + >= t.month_offset + ) ) - GROUP BY year_month, first_two_words - ORDER BY year_month, unique_users DESC - """).to_pandas() - - # --- 4️⃣ Kedro core commands MAU --- - commands_mau_df = session.sql(""" - SELECT - TO_CHAR(DATE_TRUNC('month', time), 'YYYY-MM') AS year_month, - first_two_words, - COUNT(DISTINCT username) AS unique_users - FROM ( - SELECT - a.command, - SPLIT(command, ' ')[0] || ' ' || SPLIT(command, ' ')[1] AS first_two_words, - b.username, - a.time - FROM HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP.ANY_COMMAND_RUN a - JOIN temp_username_uniques b - ON a.username = b.username - WHERE time >= '2024-10-01' - ) t - WHERE first_two_words IN ( - 'kedro run', - 'kedro viz', - 'kedro new', - 'kedro pipeline', - 'kedro jupyter', - 'kedro ipython', - 'kedro package' + ) + + # Expand grid to per-user rows, then left-join activity at the target month. + # Month difference via year/month components mirrors DATEDIFF('month', ...). + expanded = offsets.join(cohort, "cohort_month") + month_diff = ( + (activity.active_month.year() - expanded.cohort_month.year()) * 12 + + (activity.active_month.month() - expanded.cohort_month.month()) + ) + + return ( + expanded + .left_join( + activity, + (expanded.username == activity.active_username) + & (month_diff == expanded.month_offset), ) - GROUP BY year_month, first_two_words - ORDER BY year_month, unique_users DESC - """).to_pandas() - - session.close() - return new_users_df, mau_df, plugins_mau_df, commands_mau_df, cohort_retention_df + .mutate(cohort_month_str=lambda t: t.cohort_month.strftime("%Y-%m")) + .group_by(["cohort_month_str", "month_offset"]) + .agg( + active_users=lambda t: t.active_username.nunique(), + cohort_size=lambda t: t.cohort_size.max(), + ) + .rename(cohort_month="cohort_month_str") + .mutate( + retention_pct=( + 100.0 * ibis._.active_users / ibis._.cohort_size.nullif(0) + ).round(2) + ) + .order_by(["cohort_month", "month_offset"]) + .rename(str.upper) + ) + + +def build_command_mau( + any_command_run: ir.Table, unique_users: ir.Table, keep_prefixes: list[str] +) -> ir.Table: + """Count monthly unique users per command, filtered to the given two-word prefixes.""" + words = any_command_run.command.split(" ") + base = ( + any_command_run + .join(unique_users, "username")[any_command_run.columns] + .filter(any_command_run.time >= ibis.timestamp("2024-10-01")) + .mutate(first_two_words=words[0].concat(" ").concat(words[1])) + ) + return ( + base + .filter(base.first_two_words.isin(keep_prefixes)) + .mutate(year_month=base.time.truncate("M").strftime("%Y-%m")) + .group_by(["year_month", "first_two_words"]) + .agg(user_count=base.username.nunique()) + .order_by(["year_month", ibis.desc("user_count")]) + .rename(str.upper) + ) diff --git a/src/kedro_pycafe_data/pipelines/telemetry_data/pipeline.py b/src/kedro_pycafe_data/pipelines/telemetry_data/pipeline.py index d0002c8..def1b55 100644 --- a/src/kedro_pycafe_data/pipelines/telemetry_data/pipeline.py +++ b/src/kedro_pycafe_data/pipelines/telemetry_data/pipeline.py @@ -1,20 +1,70 @@ -from kedro.pipeline import Pipeline, Node -from .nodes import build_telemetry_data +from kedro.pipeline import Node, Pipeline + +from .nodes import ( + aggregate_project_stats, + build_cohort_retention, + build_command_mau, + build_mau, + build_new_users_monthly, + get_active_events, + get_unique_users, +) + def create_pipeline(**kwargs) -> Pipeline: return Pipeline( [ Node( - func=build_telemetry_data, - inputs=None, - outputs=[ - "new_kedro_users_monthly", - "mau_kedro", - "kedro_plugins_mau", - "kedro_commands_mau", - "cohort_retention", + aggregate_project_stats, + inputs="heap_project_statistics", + outputs="dt_username", + name="aggregate_project_stats", + ), + Node( + get_unique_users, + inputs="dt_username", + outputs="unique_users", + name="get_unique_users", + ), + Node( + get_active_events, + inputs=["dt_username", "unique_users"], + outputs="active_events", + name="get_active_events", + ), + Node( + build_new_users_monthly, + inputs="active_events", + outputs="new_kedro_users_monthly", + name="build_new_users_monthly", + ), + Node( + build_mau, + inputs="active_events", + outputs="mau_kedro", + name="build_mau", + ), + Node( + build_command_mau, + inputs=["heap_any_command_run", "unique_users", "params:plugins"], + outputs="kedro_plugins_mau", + name="build_plugins_mau", + ), + Node( + build_command_mau, + inputs=["heap_any_command_run", "unique_users", "params:commands"], + outputs="kedro_commands_mau", + name="build_commands_mau", + ), + Node( + build_cohort_retention, + inputs=[ + "active_events", + "params:cohort_start_month", + "params:cohort_trailing_hide_months", ], - name="build_telemetry_data", + outputs="cohort_retention", + name="build_cohort_retention", ), ] - ) \ No newline at end of file + )