Use Kedro-Ibis instead of Snowpark and chained SQL#1
Use Kedro-Ibis instead of Snowpark and chained SQL#1deepyaman wants to merge 4 commits intokedro-org:mainfrom
Conversation
Replace the monolithic Snowpark-based telemetry node (inputs=None, 5 temp tables via session.sql()) with six focused Ibis nodes backed by catalog datasets. Add ibis.TableDataset entries for the two Heap source tables and ibis.FileDataset for all CSV outputs. Move plugin/command filter lists to Kedro parameters. Apply the same ibis.TableDataset + identity-node pattern to the data_transfer pipeline, removing the last Snowpark dependency. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
There was a problem hiding this comment.
Code Review — Kedro-Ibis Refactor
Overall verdict: The structural goals are achieved — Snowpark is gone, source tables are proper catalog inputs, and the pipeline is decomposable. But there are a few correctness risks and one design issue worth addressing before this runs in production.
What's good
- Monolithic
inputs=Nonenode correctly replaced with a proper DAG of catalog-backed nodes. params:plugins/params:commandsis the right call — those lists should be overridable per environment.ibis.TableDataset+ibis.FileDatasetis idiomatic;ConnectionMixinwill share one backend connection across all datasets with the same config.data_transfercorrectly reduced to an identity function — no.to_pandas()needed.Node→nodeand removal of the boilerplate pipeline docstring are good cleanups.
Issues
1. Duplicate base aggregation — performance bug
get_unique_users and get_active_events both independently compute the same expensive aggregation over heap_project_statistics:
base = heap_stats.filter(...).group_by(...).agg(...)Since both nodes take heap_project_statistics as input, Snowflake will run this aggregation twice when the pipeline executes. The original SQL avoided this by materialising temp_dt_username once and reusing it.
The fix is to make the base aggregation its own node:
node(aggregate_project_stats, inputs="heap_project_statistics", outputs="dt_username"),
node(get_unique_users, inputs="dt_username", outputs="unique_users"),
node(get_active_events, inputs=["dt_username", "unique_users"], outputs="active_events"),2. Ambiguous columns after join in _build_command_mau
base = any_command_run.join(unique_users, any_command_run.username == unique_users.username)This join produces two username columns. The subsequent base.username.nunique() is ambiguous and Ibis will likely raise at expression-build or execution time. Add an explicit column selection after the join (as get_active_events correctly does with [base.columns]):
.join(unique_users, "username")[any_command_run.columns]3. unique_users naming collision in _build_command_mau
The function parameter is named unique_users (an ir.Table), and the aggregation output column is also named unique_users:
.agg(unique_users=base.username.nunique())Rename the output column to user_count to avoid confusion.
4. Node names missing in telemetry_data/pipeline.py
None of the six new nodes have a name= argument. Kedro auto-generates names but explicit names matter for kedro run --node, log readability, and CI scripts that reference node names by string.
Minor
requirements.txtmissing trailing newline.
Summary
| # | Severity | Item |
|---|---|---|
| 1 | High | Double base aggregation hits Snowflake twice — extract aggregate_project_stats node |
| 2 | High | Ambiguous username after join in _build_command_mau |
| 3 | Medium | unique_users name collision (param vs agg alias) |
| 4 | Low | Missing node names |
| 5 | Low | Missing trailing newline in requirements.txt |
|
Addressed in 26025ba:
|
- Extract aggregate_project_stats node so the base aggregation over heap_project_statistics runs once instead of twice (dt_username is now a shared intermediate consumed by both get_unique_users and get_active_events) - Fix ambiguous username column after join in _build_command_mau by selecting any_command_run.columns immediately after the join - Rename unique_users agg alias to user_count to avoid collision with the unique_users parameter name - Add explicit name= to all telemetry_data pipeline nodes - Add missing trailing newline to requirements.txt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
26025ba to
1ce7a97
Compare
deepyaman
left a comment
There was a problem hiding this comment.
In addition to the changes suggested/requested below, would it be helpful to share the SQL statements corresponding to the nodes? Or to otherwise make us feel more confident that the porting to Ibis has largely maintained the same logic?
- Replace build_plugins_mau/build_commands_mau wrappers with a single public build_command_mau function used directly for both pipeline nodes - Remove pandas and pyarrow from requirements.txt (pulled in transitively) - Add missing one-line docstrings to build_new_users_monthly, build_mau, and build_command_mau Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
ca40c93 to
5943f62
Compare
|
Addressed in ca40c93:
Regarding the parameter prefix simplification — kept the full two-word prefix (e.g. Regarding the SQL equivalents question — happy to add a note or docstring showing the corresponding SQL for each node if that would help with review confidence. Let us know. |
|
Here's the Ibis-generated SQL (via Also fixed a runtime bug uncovered while generating this:
|
| Before (temp table) | After (Ibis) |
|---|---|
CREATE OR REPLACE TEMPORARY TABLE temp_dt_username AS
SELECT DATE(time) AS dt,
username,
MAX(LEFT(PROJECT_VERSION, 4)) AS max_version_prefix,
COUNT(*) AS cnt
FROM HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP.KEDRO_PROJECT_STATISTICS
WHERE DATE(time) >= '2024-09-01'
AND (is_ci_env IS NULL OR is_ci_env = 'false')
GROUP BY 1, 2 |
SELECT
"t1"."username",
TO_DATE("t1"."time") AS "dt",
MAX(SUBSTRING("t1"."project_version", 1, 4)) AS "max_version_prefix"
FROM (
SELECT * FROM "KEDRO_PROJECT_STATISTICS" AS "t0"
WHERE TO_DATE("t0"."time") >= DATE_FROM_PARTS(2024, 9, 1)
AND ("t0"."is_ci_env" IS NULL OR "t0"."is_ci_env" = 'false')
) AS "t1"
GROUP BY 1, 2 |
Note: cnt is dropped — it was never used downstream.
get_unique_users ← temp_username_min_max_dt + temp_username_uniques
| Before (two temp tables) | After (Ibis) |
|---|---|
CREATE OR REPLACE TEMPORARY TABLE temp_username_min_max_dt AS
SELECT username, MIN(dt) AS min_dt, MAX(dt) AS max_dt
FROM temp_dt_username
GROUP BY 1;
CREATE OR REPLACE TEMPORARY TABLE temp_username_uniques AS
SELECT username
FROM temp_username_min_max_dt
WHERE DATEDIFF('day', min_dt, max_dt) > 8 |
SELECT "t3"."username"
FROM (
SELECT "t2"."username",
MIN("t2"."dt") AS "min_dt",
MAX("t2"."dt") AS "max_dt"
FROM <dt_username> AS "t2"
GROUP BY 1
) AS "t3"
WHERE ("t3"."max_dt" - "t3"."min_dt") > INTERVAL '8 DAY' |
get_active_events ← temp_dt_username_unique
| Before | After (Ibis) |
|---|---|
CREATE OR REPLACE TEMPORARY TABLE temp_dt_username_unique AS
SELECT a.*
FROM temp_dt_username a
JOIN temp_username_uniques b
ON a.username = b.username
AND b.username IS NOT NULL |
SELECT "t4"."username", "t4"."dt", "t4"."max_version_prefix"
FROM <dt_username> AS "t4"
INNER JOIN <unique_users> AS "t7"
ON "t4"."username" = "t7"."username" |
Note: b.username IS NOT NULL is implicit in an INNER JOIN.
build_new_users_monthly ← temp_dt_username_unique_first_date + Final result 1
| Before (temp table + final query) | After (Ibis — final SELECT only, preceding CTEs verified above) |
|---|---|
CREATE OR REPLACE TEMPORARY TABLE temp_dt_username_unique_first_date AS
SELECT username,
MIN(dt) AS first_date,
MIN(max_version_prefix) AS max_version_prefix
FROM temp_dt_username_unique
GROUP BY username
ORDER BY first_date;
SELECT first_year_month, max_version_prefix, COUNT(*) AS count
FROM (
SELECT *, TO_CHAR(first_date, 'YYYY-MM') AS first_year_month
FROM temp_dt_username_unique_first_date
) t
WHERE first_year_month >= '2024-11'
GROUP BY 1, 2
ORDER BY 1, 2 |
SELECT "t10"."first_year_month", "t10"."max_version_prefix",
COUNT(*) AS "count"
FROM (
SELECT "t9"."username", "t9"."first_date", "t9"."max_version_prefix",
TO_CHAR("t9"."first_date", 'yyyy-mm') AS "first_year_month"
FROM (
SELECT "t8"."username",
MIN("t8"."dt") AS "first_date",
MIN("t8"."max_version_prefix") AS "max_version_prefix"
FROM <active_events> AS "t8"
GROUP BY 1
) AS "t9"
WHERE TO_CHAR("t9"."first_date", 'yyyy-mm') >= '2024-11'
) AS "t10"
GROUP BY 1, 2
ORDER BY "first_year_month" ASC, "max_version_prefix" ASC |
build_mau ← Final result 2
| Before | After (Ibis — final SELECT only) |
|---|---|
SELECT TO_CHAR(DATE_TRUNC('month', dt), 'YYYY-MM') AS year_month,
max_version_prefix,
COUNT(DISTINCT username) AS mau
FROM temp_dt_username_unique
WHERE dt >= '2024-10-01'
GROUP BY year_month, max_version_prefix
ORDER BY year_month, max_version_prefix |
SELECT "t9"."year_month", "t9"."max_version_prefix",
COUNT(DISTINCT "t9"."username") AS "mau"
FROM (
SELECT "t8"."username", "t8"."dt", "t8"."max_version_prefix",
TO_CHAR(DATE_TRUNC('MONTH', "t8"."dt"), 'yyyy-mm') AS "year_month"
FROM <active_events> AS "t8"
WHERE "t8"."dt" >= DATE_FROM_PARTS(2024, 10, 1)
) AS "t9"
GROUP BY 1, 2
ORDER BY "year_month" ASC, "max_version_prefix" ASC |
build_command_mau ← Final results 3 & 4 (plugins / commands)
| Before | After (Ibis — final SELECT only) |
|---|---|
SELECT TO_CHAR(DATE_TRUNC('month', time), 'YYYY-MM') AS year_month,
first_two_words,
COUNT(DISTINCT username) AS unique_users
FROM (
SELECT a.command,
SPLIT(command, ' ')[0] || ' ' || SPLIT(command, ' ')[1]
AS first_two_words,
b.username, a.time
FROM HEAP_FRAMEWORK_VIZ_PRODUCTION.HEAP.ANY_COMMAND_RUN a
JOIN temp_username_uniques b ON a.username = b.username
WHERE time >= '2024-10-01'
) t
WHERE first_two_words IN ('kedro mlflow', ...)
GROUP BY year_month, first_two_words
ORDER BY year_month, unique_users DESC |
SELECT "t10"."year_month", "t10"."first_two_words",
COUNT(DISTINCT "t10"."username") AS "user_count"
FROM (
SELECT "t9"."time", "t9"."username", "t9"."command",
"t9"."first_two_words",
TO_CHAR(DATE_TRUNC('MONTH', "t9"."time"), 'yyyy-mm') AS "year_month"
FROM (
SELECT "t8"."time", "t8"."username", "t8"."command",
CONCAT(CONCAT(GET(SPLIT("t8"."command", ' '), 0), ' '),
GET(SPLIT("t8"."command", ' '), 1))
AS "first_two_words"
FROM <any_command_run JOIN unique_users> AS "t8"
WHERE "t8"."time" >= TIMESTAMP_FROM_PARTS(2024, 10, 1, 0, 0, 0, 0)
) AS "t9"
WHERE "t9"."first_two_words" IN ('kedro mlflow', 'kedro docker', ...)
) AS "t10"
GROUP BY 1, 2
ORDER BY "year_month" ASC, "user_count" DESC NULLS LAST |
ibis.DateValue.delta() takes only one argument (the other date); the 'day' unit argument caused a TypeError at runtime. Replace with interval arithmetic: (max_dt - min_dt) > ibis.interval(days=8). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
18a79dc to
5b04ca3
Compare
Replace the monolithic Snowpark-based telemetry node (inputs=None, 5 temp tables via session.sql()) with six focused Ibis nodes backed by catalog datasets. Add ibis.TableDataset entries for the two Heap source tables and ibis.FileDataset for all CSV outputs. Move plugin/command filter lists to Kedro parameters. Apply the same ibis.TableDataset + identity-node pattern to the data_transfer pipeline, removing the last Snowpark dependency.