From 450004adc3de604ad4dc7e652898b6af842be71b Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 13:10:44 -0400 Subject: [PATCH 1/8] python wrapper for sources --- api/py/test/lineage/test_parse_group_by.py | 5 ++- api/py/test/lineage/test_parse_join.py | 23 +++++------ api/py/test/sample/group_bys/kaggle/clicks.py | 17 ++++---- .../test/sample/group_bys/kaggle/outbrain.py | 1 - .../sample/group_bys/quickstart/purchases.py | 17 ++++---- .../sample/group_bys/quickstart/returns.py | 17 ++++---- .../sample/group_bys/quickstart/schema.py | 22 +++++----- .../test/sample/group_bys/quickstart/users.py | 15 ++++--- .../unit_test/user/sample_nested_group_by.py | 24 +++++------ .../sample/joins/quickstart/training_set.py | 17 ++++---- .../unit_test/user/sample_transactions.py | 21 +++++----- .../sample_team/event_sample_group_by.v1 | 3 +- .../sample_deprecation_group_by.v1 | 6 ++- .../sample_team/sample_group_by_group_by.v1 | 3 +- ...ple_chaining_group_by.chaining_group_by_v1 | 6 ++- .../group_bys/unit_test/sample_group_by.v1 | 3 +- .../unit_test/user.sample_nested_group_by.v1 | 3 +- .../sample_backfill_mutation_join.v0 | 6 ++- .../sample_chaining_join.parent_join | 9 +++-- .../sample_team/sample_deprecation_join.v1 | 9 +++-- .../sample_team/sample_join.consistency_check | 6 ++- .../sample_join.group_by_of_group_by | 6 ++- .../joins/sample_team/sample_join.never | 6 ++- .../sample_team/sample_join.no_log_flattener | 6 ++- .../joins/sample_team/sample_join.v1 | 6 ++- .../sample_team/sample_join_bootstrap.v1 | 9 +++-- .../sample_team/sample_join_bootstrap.v2 | 15 ++++--- .../sample_team/sample_join_derivation.v1 | 9 +++-- .../sample_team/sample_join_external_parts.v1 | 6 ++- .../sample_team/sample_join_external_parts.v2 | 6 ++- .../sample_join_for_dependency_test.v1 | 6 ++- .../sample_join_from_group_by_from_join.v1 | 6 ++- .../sample_team/sample_join_from_module.v1 | 12 ++++-- .../sample_team/sample_join_from_shorthand.v1 | 3 +- ...join_with_derivations_on_external_parts.v1 | 9 +++-- .../joins/sample_team/sample_label_join.v1 | 15 ++++--- .../sample_team/sample_label_join_with_agg.v1 | 15 ++++--- .../joins/sample_team/sample_online_join.v1 | 15 ++++--- .../unit_test/sample_parent_join.parent_join | 6 ++- .../unit_test/user.sample_transactions.v1 | 6 ++- api/py/test/sample/sources/kaggle/outbrain.py | 6 +-- api/py/test/sample/sources/test_sources.py | 40 +++++++++---------- api/py/test/test_group_by.py | 25 ++++++------ api/py/test/test_utils.py | 13 +++--- api/thrift/api.thrift | 10 +++++ 45 files changed, 280 insertions(+), 209 deletions(-) diff --git a/api/py/test/lineage/test_parse_group_by.py b/api/py/test/lineage/test_parse_group_by.py index d604ac94f9..6d5070c70d 100644 --- a/api/py/test/lineage/test_parse_group_by.py +++ b/api/py/test/lineage/test_parse_group_by.py @@ -16,6 +16,7 @@ import unittest from ai.chronon import group_by +from ai.chronon import source from ai.chronon.api import ttypes from ai.chronon.group_by import Accuracy, Derivation from ai.chronon.lineage.lineage_metadata import ConfigType, TableType @@ -25,7 +26,7 @@ class TestParseGroupBy(unittest.TestCase): def setUp(self): - gb_event_source = ttypes.EventSource( + gb_event_source = source.EventSource( table="source.gb_table", topic=None, query=ttypes.Query( @@ -35,7 +36,7 @@ def setUp(self): ), ) - gb_event_source1 = ttypes.EventSource( + gb_event_source1 = source.EventSource( table="source.gb_table1", topic=None, query=ttypes.Query( diff --git a/api/py/test/lineage/test_parse_join.py b/api/py/test/lineage/test_parse_join.py index ed8343532e..2aed6943c5 100644 --- a/api/py/test/lineage/test_parse_join.py +++ b/api/py/test/lineage/test_parse_join.py @@ -16,6 +16,7 @@ import unittest from ai.chronon import group_by +from ai.chronon import source from ai.chronon.api import ttypes from ai.chronon.api import ttypes as api from ai.chronon.group_by import Derivation @@ -27,7 +28,7 @@ class TestParseJoin(unittest.TestCase): def setUp(self): - gb_event_source = ttypes.EventSource( + gb_event_source = source.EventSource( table="gb_table", topic=None, query=ttypes.Query( @@ -59,17 +60,15 @@ def setUp(self): ) self.join = Join( - left=api.Source( - events=api.EventSource( - table="join_event_table", - query=api.Query( - startPartition="2020-04-09", - selects={ - "subject": "subject", - "event_id": "event", - }, - timeColumn="CAST(ts AS DOUBLE)", - ), + left=source.EventSource( + table="join_event_table", + query=api.Query( + startPartition="2020-04-09", + selects={ + "subject": "subject", + "event_id": "event", + }, + timeColumn="CAST(ts AS DOUBLE)", ), ), output_namespace="test_db", diff --git a/api/py/test/sample/group_bys/kaggle/clicks.py b/api/py/test/sample/group_bys/kaggle/clicks.py index fd8c3f3b21..6a4e94aca1 100644 --- a/api/py/test/sample/group_bys/kaggle/clicks.py +++ b/api/py/test/sample/group_bys/kaggle/clicks.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, @@ -23,6 +22,7 @@ TimeUnit, Accuracy ) +from ai.chronon.source import EventSource from ai.chronon.utils import get_staging_query_output_table_name from staging_queries.kaggle.outbrain import base_table @@ -43,14 +43,13 @@ """ -source = Source( - events=EventSource( - table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table - topic="some_topic", # You would set your streaming source topic here - query=Query( - selects=select("ad_id", "clicked"), - time_column="ts") - )) +source = EventSource( + table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table + topic="some_topic", # You would set your streaming source topic here + query=Query( + selects=select("ad_id", "clicked"), + time_column="ts") +) ad_streaming = GroupBy( sources=[source], diff --git a/api/py/test/sample/group_bys/kaggle/outbrain.py b/api/py/test/sample/group_bys/kaggle/outbrain.py index eacf3c7ae1..249c2f1534 100644 --- a/api/py/test/sample/group_bys/kaggle/outbrain.py +++ b/api/py/test/sample/group_bys/kaggle/outbrain.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, diff --git a/api/py/test/sample/group_bys/quickstart/purchases.py b/api/py/test/sample/group_bys/quickstart/purchases.py index 166a6398bd..e2bddf54ca 100644 --- a/api/py/test/sample/group_bys/quickstart/purchases.py +++ b/api/py/test/sample/group_bys/quickstart/purchases.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from ai.chronon.group_by import ( GroupBy, Aggregation, @@ -28,14 +28,13 @@ """ # This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. -source = Source( - events=EventSource( - table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events - query=Query( - selects=select("user_id","purchase_price"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events + query=Query( + selects=select("user_id","purchase_price"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below diff --git a/api/py/test/sample/group_bys/quickstart/returns.py b/api/py/test/sample/group_bys/quickstart/returns.py index a7c97ce710..5bc072d758 100644 --- a/api/py/test/sample/group_bys/quickstart/returns.py +++ b/api/py/test/sample/group_bys/quickstart/returns.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from ai.chronon.group_by import ( GroupBy, Aggregation, @@ -28,14 +28,13 @@ This GroupBy aggregates metrics about a user's previous purchases in various windows. """ -source = Source( - events=EventSource( - table="data.returns", # This points to the log table with historical return events - topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092", - query=Query( - selects=select("user_id","refund_amt"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.returns", # This points to the log table with historical return events + topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092", + query=Query( + selects=select("user_id","refund_amt"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below diff --git a/api/py/test/sample/group_bys/quickstart/schema.py b/api/py/test/sample/group_bys/quickstart/schema.py index 3cc2155178..3665668226 100644 --- a/api/py/test/sample/group_bys/quickstart/schema.py +++ b/api/py/test/sample/group_bys/quickstart/schema.py @@ -1,20 +1,18 @@ from ai.chronon.group_by import GroupBy, Aggregation, Operation -from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select -logging_schema_source = Source( - events=EventSource( - table="default.chronon_log_table", - query=Query( - selects=select( - schema_hash="decode(unbase64(key_base64), 'utf-8')", - schema_value="decode(unbase64(value_base64), 'utf-8')" - ), - wheres=["name='SCHEMA_PUBLISH_EVENT'"], - time_column="ts_millis", +logging_schema_source = EventSource( + table="default.chronon_log_table", + query=Query( + selects=select( + schema_hash="decode(unbase64(key_base64), 'utf-8')", + schema_value="decode(unbase64(value_base64), 'utf-8')" ), - ) + wheres=["name='SCHEMA_PUBLISH_EVENT'"], + time_column="ts_millis", + ), ) v1 = GroupBy( diff --git a/api/py/test/sample/group_bys/quickstart/users.py b/api/py/test/sample/group_bys/quickstart/users.py index 4c4025054e..3e6d543167 100644 --- a/api/py/test/sample/group_bys/quickstart/users.py +++ b/api/py/test/sample/group_bys/quickstart/users.py @@ -13,24 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EntitySource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, ) +from ai.chronon.source import EntitySource """ The primary key for this GroupBy is the same as the primary key of the source table. Therefore, it doesn't perform any aggregation, but just extracts user fields as features. """ -source = Source( - entities=EntitySource( - snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog - query=Query( - selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about - ) - )) +source = EntitySource( + snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog + query=Query( + selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about + ) +) v1 = GroupBy( sources=[source], diff --git a/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py b/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py index 2e4f8235ee..e72b9d0bdf 100644 --- a/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py +++ b/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py @@ -1,21 +1,19 @@ -from ai.chronon.api import ttypes from ai.chronon.api.ttypes import Aggregation, Operation, TimeUnit, Window from ai.chronon.group_by import Aggregations, GroupBy from ai.chronon.query import Query, select +from ai.chronon.source import EventSource -source = ttypes.Source( - events=ttypes.EventSource( - table="random_table_name", - query=Query( - selects=select( - user="id_item", - play="if(transaction_type='A', 1, 0)", - pause="if(transaction_type='B', 1, 0)", - ), - start_partition="2023-03-01", - time_column="UNIX_TIMESTAMP(ts) * 1000", +source = EventSource( + table="random_table_name", + query=Query( + selects=select( + user="id_item", + play="if(transaction_type='A', 1, 0)", + pause="if(transaction_type='B', 1, 0)", ), - ) + start_partition="2023-03-01", + time_column="UNIX_TIMESTAMP(ts) * 1000", + ), ) windows = [ diff --git a/api/py/test/sample/joins/quickstart/training_set.py b/api/py/test/sample/joins/quickstart/training_set.py index f0127ddf58..c5170db475 100644 --- a/api/py/test/sample/joins/quickstart/training_set.py +++ b/api/py/test/sample/joins/quickstart/training_set.py @@ -14,8 +14,8 @@ # limitations under the License. from ai.chronon.join import Join, JoinPart -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from group_bys.quickstart.purchases import v1 as purchases_v1 from group_bys.quickstart.returns import v1 as returns_v1 @@ -25,14 +25,13 @@ This is the "left side" of the join that will comprise our training set. It is responsible for providing the primary keys and timestamps for which features will be computed. """ -source = Source( - events=EventSource( - table="data.checkouts", - query=Query( - selects=select("user_id"), # The primary key used to join various GroupBys together - time_column="ts", - ) # The event time used to compute feature values as-of - )) +source = EventSource( + table="data.checkouts", + query=Query( + selects=select("user_id"), # The primary key used to join various GroupBys together + time_column="ts", + ) # The event time used to compute feature values as-of +) v1 = Join( left=source, diff --git a/api/py/test/sample/joins/unit_test/user/sample_transactions.py b/api/py/test/sample/joins/unit_test/user/sample_transactions.py index 6f1f046df1..4486bf2b54 100644 --- a/api/py/test/sample/joins/unit_test/user/sample_transactions.py +++ b/api/py/test/sample/joins/unit_test/user/sample_transactions.py @@ -1,21 +1,20 @@ # from airbnb.data_sources_2 import HiveEventSource -from ai.chronon.api import ttypes from ai.chronon.join import Join, JoinPart from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from group_bys.unit_test.user.sample_nested_group_by import v1 as nested_v1 -source = ttypes.Source( - events=ttypes.EventSource( - table="item_snapshot", - query=Query( - selects=select(user="id_requester"), - wheres=["dim_requester_user_role = 'user'"], - start_partition="2023-03-01", - time_column="UNIX_TIMESTAMP(ts_created_at_utc) * 1000", - ), - ) +source = EventSource( + table="item_snapshot", + query=Query( + selects=select(user="id_requester"), + wheres=["dim_requester_user_role = 'user'"], + start_partition="2023-03-01", + time_column="UNIX_TIMESTAMP(ts_created_at_utc) * 1000", + ), ) + v1 = Join( left=source, right_parts=[JoinPart(group_by=nested_v1)], diff --git a/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 index 8a697e9525..05b44a6c5c 100644 --- a/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 @@ -25,7 +25,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 index e52efe250b..195d6fbea4 100644 --- a/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 @@ -28,7 +28,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -42,7 +43,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 index 942d3f0b18..5ac77882b9 100644 --- a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 b/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 index 1752b48607..b22b0e1fb7 100644 --- a/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 +++ b/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 @@ -46,7 +46,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -78,7 +79,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 b/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 index 473f27f3b1..0275bf5da2 100644 --- a/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 +++ b/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 @@ -25,7 +25,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 b/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 index e3fe60cc41..dd77c50c5b 100644 --- a/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 +++ b/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 @@ -26,7 +26,8 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 b/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 index 8ced8e1ca2..b578f92f9b 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 +++ b/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -59,7 +60,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join b/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join index f8f7457112..756d8145b1 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join +++ b/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join @@ -31,7 +31,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -63,7 +64,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -132,7 +134,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 index ad3f737cad..907c395dc3 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 @@ -29,7 +29,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -64,7 +65,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -78,7 +80,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check b/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check index b1b550d438..e672fdc473 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check +++ b/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check @@ -28,7 +28,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -62,7 +63,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by index 51df118a8f..a12f502914 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by +++ b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by @@ -28,7 +28,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -64,7 +65,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.never b/api/py/test/sample/production/joins/sample_team/sample_join.never index dabae091eb..a9ec6f2f07 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.never +++ b/api/py/test/sample/production/joins/sample_team/sample_join.never @@ -27,7 +27,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener b/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener index 195a9d381a..72c5b8c06a 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener +++ b/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener @@ -27,7 +27,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_join.v1 index 31d96721d9..dab68d4785 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join.v1 @@ -32,7 +32,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -66,7 +67,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 index 69c04fc065..04e92ea57f 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 @@ -31,7 +31,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -63,7 +64,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -132,7 +134,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 index 69a129caf5..59a7ce7661 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 @@ -33,7 +33,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -65,7 +66,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -134,7 +136,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -194,7 +197,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -208,7 +212,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 index cb70e035c9..610ad06969 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -130,7 +132,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 index ae60db1855..120bb193da 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 @@ -27,7 +27,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 index b083806d88..e1eacc8eb6 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 @@ -27,7 +27,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 index 20f05ea94a..b5c0a0376f 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 @@ -27,7 +27,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -59,7 +60,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 index e189a7aa9e..2404389795 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 @@ -28,7 +28,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -64,7 +65,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 index 72ff11b534..baac6662f3 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 @@ -31,7 +31,8 @@ }, "startPartition": "2021-03-01", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -60,7 +61,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -74,7 +76,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -135,7 +138,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 index 1afce25228..2eee2bbff1 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [] diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 index 60b1f210cf..6e6dcb236d 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -130,7 +132,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 index 0ee74f427e..7bf55cbfd2 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -129,7 +131,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -143,7 +146,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -208,7 +212,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 b/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 index 01474df919..1586c31818 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 @@ -29,7 +29,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -61,7 +62,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -129,7 +131,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -143,7 +146,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -211,7 +215,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 index ab60e81e72..0ed01d24f8 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 @@ -32,7 +32,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -64,7 +65,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -133,7 +135,8 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], @@ -193,7 +196,8 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, { @@ -207,7 +211,8 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join b/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join index 63553a84be..3645cb55eb 100644 --- a/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join +++ b/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join @@ -27,7 +27,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -55,7 +56,8 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 b/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 index 759f529342..77daf17253 100644 --- a/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 +++ b/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 @@ -30,7 +30,8 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts_created_at_utc) * 1000", "setups": [] - } + }, + "customJson": "{}" } }, "joinParts": [ @@ -63,7 +64,8 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - } + }, + "customJson": "{}" } } ], diff --git a/api/py/test/sample/sources/kaggle/outbrain.py b/api/py/test/sample/sources/kaggle/outbrain.py index dc370f0021..db0c426650 100644 --- a/api/py/test/sample/sources/kaggle/outbrain.py +++ b/api/py/test/sample/sources/kaggle/outbrain.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select from ai.chronon.utils import get_staging_query_output_table_name from staging_queries.kaggle.outbrain import base_table @@ -29,10 +29,10 @@ def outbrain_left_events(*columns): """ Defines a source based off of the output table of the `base_table` StagingQuery. """ - return Source(events=EventSource( + return EventSource( table=get_staging_query_output_table_name(base_table), query=Query( selects=select(*columns), time_column="ts", ), - )) + ) diff --git a/api/py/test/sample/sources/test_sources.py b/api/py/test/sample/sources/test_sources.py index 4198c9a938..2e5c848811 100644 --- a/api/py/test/sample/sources/test_sources.py +++ b/api/py/test/sample/sources/test_sources.py @@ -18,13 +18,13 @@ select, ) from ai.chronon.utils import get_staging_query_output_table_name -from ai.chronon.api import ttypes +from ai.chronon.source import EventSource, EntitySource from staging_queries.sample_team import sample_staging_query def basic_event_source(table): - return ttypes.Source(events=ttypes.EventSource( + return EventSource( table=table, query=Query( selects=select( @@ -34,11 +34,11 @@ def basic_event_source(table): start_partition="2021-04-09", time_column="ts", ), - )) + ) # Sample Event Source used in tests. -event_source = ttypes.Source(events=ttypes.EventSource( +event_source = EventSource( table="sample_namespace.sample_table_group_by", query=Query( selects=select( @@ -48,15 +48,15 @@ def basic_event_source(table): start_partition="2021-04-09", time_column="ts", ), -)) +) # Sample Entity Source -entity_source = ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_table.sample_entity_snapshot", +entity_source = EntitySource( + snapshot_table="sample_table.sample_entity_snapshot", # hr partition is not necessary - just to demo that we support various # partitioning schemes - mutationTable="sample_table.sample_entity_mutations/hr=00:00", - mutationTopic="sample_topic", + mutation_table="sample_table.sample_entity_mutations/hr=00:00", + mutation_topic="sample_topic", query=Query( start_partition='2021-03-01', selects=select( @@ -65,10 +65,10 @@ def basic_event_source(table): ), time_column="ts" ), -)) +) -batch_entity_source = ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_table.sample_entity_snapshot", +batch_entity_source = EntitySource( + snapshot_table="sample_table.sample_entity_snapshot", query=Query( start_partition='2021-03-01', selects=select( @@ -77,11 +77,11 @@ def basic_event_source(table): ), time_column="ts" ), -)) +) # Sample Entity Source derived from a staging query. -staging_entities=ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_namespace.{}".format(get_staging_query_output_table_name(sample_staging_query.v1)), +staging_entities=EntitySource( + snapshot_table="sample_namespace.{}".format(get_staging_query_output_table_name(sample_staging_query.v1)), query=Query( start_partition='2021-03-01', selects=select(**{ @@ -91,11 +91,11 @@ def basic_event_source(table): 'place_id': 'place_id' }) ) -)) +) # A Source that was deprecated but still relevant (requires stitching). -events_until_20210409 = ttypes.Source(events=ttypes.EventSource( +events_until_20210409 = EventSource( table="sample_namespace.sample_table_group_by", query=Query( start_partition='2021-03-01', @@ -106,10 +106,10 @@ def basic_event_source(table): }), time_column="UNIX_TIMESTAMP(ts) * 1000" ), -)) +) # The new source -events_after_20210409 = ttypes.Source(events=ttypes.EventSource( +events_after_20210409 = EventSource( table="sample_namespace.another_sample_table_group_by", query=Query( start_partition='2021-03-01', @@ -119,4 +119,4 @@ def basic_event_source(table): }), time_column="__timestamp" ), -)) +) diff --git a/api/py/test/test_group_by.py b/api/py/test/test_group_by.py index 3464a86a86..d6af9a1ddd 100644 --- a/api/py/test/test_group_by.py +++ b/api/py/test/test_group_by.py @@ -18,7 +18,8 @@ from ai.chronon import group_by, query from ai.chronon.group_by import GroupBy, Derivation, TimeUnit, Window, Aggregation, Accuracy from ai.chronon.api import ttypes -from ai.chronon.api.ttypes import EventSource, EntitySource, Operation +from ai.chronon.api.ttypes import Operation +from ai.chronon.source import EventSource, EntitySource @pytest.fixture @@ -45,7 +46,7 @@ def event_source(table, topic=None): """ Sample left join """ - return ttypes.EventSource( + return EventSource( table=table, topic=topic, query=ttypes.Query( @@ -64,9 +65,9 @@ def entity_source(snapshotTable, mutationTable): """ Sample source """ - return ttypes.EntitySource( - snapshotTable=snapshotTable, - mutationTable=mutationTable, + return EntitySource( + snapshot_table=snapshotTable, + mutation_table=mutationTable, query=ttypes.Query( startPartition="2020-04-09", selects={ @@ -201,15 +202,15 @@ def test_generic_collector(): def test_select_sanitization(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( # No selects are spcified + EventSource( # No selects are spcified table="event_table1", query=query.Query( selects=None, time_column="ts" ) ), - ttypes.EntitySource( # Some selects are specified - snapshotTable="entity_table1", + EntitySource( # Some selects are specified + snapshot_table="entity_table1", query=query.Query( selects={ "key1": "key1_sql", @@ -236,8 +237,8 @@ def test_snapshot_with_hour_aggregation(): with pytest.raises(AssertionError): group_by.GroupBy( sources=[ - ttypes.EntitySource( # Some selects are specified - snapshotTable="entity_table1", + EntitySource( # Some selects are specified + snapshot_table="entity_table1", query=query.Query( selects={ "key1": "key1_sql", @@ -260,7 +261,7 @@ def test_snapshot_with_hour_aggregation(): def test_additional_metadata(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( + EventSource( table="event_table1", query=query.Query( selects=None, @@ -279,7 +280,7 @@ def test_additional_metadata(): def test_group_by_with_description(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( + EventSource( table="event_table1", query=query.Query( selects=None, diff --git a/api/py/test/test_utils.py b/api/py/test/test_utils.py index 3716542cb0..4498318e55 100644 --- a/api/py/test/test_utils.py +++ b/api/py/test/test_utils.py @@ -19,8 +19,9 @@ import ai.chronon.api.ttypes as api from ai.chronon import utils -from ai.chronon.api.ttypes import EntitySource, EventSource, Query, Source +from ai.chronon.api.ttypes import Query from ai.chronon.repo.serializer import file2thrift, json2thrift +from ai.chronon.source import EventSource, EntitySource from ai.chronon.utils import get_dependencies, wait_for_simple_schema @@ -365,12 +366,11 @@ def test_wait_for_simple_schema_with_subpartition(query_with_partition_column: Q def test_get_dependencies_with_entities_mutation(query_with_partition_column: Query): - entity = EntitySource( - snapshotTable="snap_table", - mutationTable="mut_table", + src = EntitySource( + snapshot_table="snap_table", + mutation_table="mut_table", query=query_with_partition_column, ) - src = Source(entities=entity) deps_json = get_dependencies(src, lag=0) assert len(deps_json) == 2 deps = [json.loads(d) for d in deps_json] @@ -382,8 +382,7 @@ def test_get_dependencies_with_entities_mutation(query_with_partition_column: Qu def test_get_dependencies_with_events(query_with_partition_column: Query): - event = EventSource(table="event_table", query=query_with_partition_column) - src = Source(events=event) + src = EventSource(table="event_table", query=query_with_partition_column) deps_json = get_dependencies(src, lag=2) assert len(deps_json) == 1 dep = json.loads(deps_json[0]) diff --git a/api/thrift/api.thrift b/api/thrift/api.thrift index 2ca6242ec5..2458304564 100644 --- a/api/thrift/api.thrift +++ b/api/thrift/api.thrift @@ -68,6 +68,11 @@ struct EventSource { * If each new hive partition contains not just the current day's events but the entire set of events since the begininng. The key property is that the events are not mutated across partitions. **/ 4: optional bool isCumulative + + /** + * Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table + **/ + 5: optional string customJson } @@ -98,6 +103,11 @@ struct EntitySource { * The logic used to scan both the table and the topic. Contains row level transformations and filtering expressed as Spark SQL statements. */ 4: optional Query query + + /** + * Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table + **/ + 5: optional string customJson } struct ExternalSource { From 5468b956f8055311c78a10f17484d7db2b0aeaa5 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 13:32:02 -0400 Subject: [PATCH 2/8] source.py --- api/py/ai/chronon/source.py | 74 +++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 api/py/ai/chronon/source.py diff --git a/api/py/ai/chronon/source.py b/api/py/ai/chronon/source.py new file mode 100644 index 0000000000..1d4fe292f5 --- /dev/null +++ b/api/py/ai/chronon/source.py @@ -0,0 +1,74 @@ +import json +from typing import Optional + +import ai.chronon.api.ttypes as ttypes + + +def EventSource( + table: str, + query: ttypes.Query, + topic: Optional[str] = None, + is_cumulative: Optional[bool] = None, + **kwargs +) -> ttypes.Source: + """ + + :param table: + :param topic: + :param query: + :param is_cumulative: + :return: + """ + return ttypes.Source( + events=ttypes.EventSource( + table=table, + topic=topic, + query=query, + isCumulative=is_cumulative, + customJson=json.dumps(kwargs), + ) + ) + + +def EntitySource( + snapshot_table: str, + query: ttypes.Query, + mutation_table: Optional[str] = None, + mutation_topic: Optional[str] = None, + **kwargs +) -> ttypes.Source: + """ + + :param snapshot_table: + :param mutation_table: + :param query: + :param mutation_topic: + :return: + """ + return ttypes.Source( + entities=ttypes.EntitySource( + snapshotTable=snapshot_table, + mutationTable=mutation_table, + query=query, + mutationTopic=mutation_topic, + customJson=json.dumps(kwargs), + ) + ) + + +def JoinSource( + join: ttypes.Join, + query: ttypes.Query, +) -> ttypes.Source: + """ + + :param join: + :param query: + :return: + """ + return ttypes.Source( + joinSource=ttypes.JoinSource( + join=join, + query=query + ) + ) From ef96b80cc5250cdadf7b828f8ed202001abb6052 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 13:52:55 -0400 Subject: [PATCH 3/8] doc strings --- api/py/ai/chronon/source.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/api/py/ai/chronon/source.py b/api/py/ai/chronon/source.py index 1d4fe292f5..36e8860274 100644 --- a/api/py/ai/chronon/source.py +++ b/api/py/ai/chronon/source.py @@ -13,11 +13,12 @@ def EventSource( ) -> ttypes.Source: """ - :param table: - :param topic: - :param query: - :param is_cumulative: + :param table: Points to a table that has historical data for the input events + :param query: Contains row level transformations and filtering expressed as Spark SQL statements. Applied to both table and topic + :param topic: (Optional) Kafka topic that can be listened to for realtime updates + :param is_cumulative: Indicates that each new partition contains not just the current day's events but the entire set of events since the beginning :return: + A source object of kind EventSource """ return ttypes.Source( events=ttypes.EventSource( @@ -39,11 +40,12 @@ def EntitySource( ) -> ttypes.Source: """ - :param snapshot_table: - :param mutation_table: - :param query: - :param mutation_topic: + :param snapshot_table: Points to a table that contains periodical snapshots of the entire dataset + :param query: Contains row level transformations and filtering expressed as Spark SQL statements + :param mutation_table: (Optional) Points to a table that contains all changes applied to the dataset + :param mutation_topic: (Optional) Kafka topic that delivers changes in realtime :return: + A source object of kind EntitySource """ return ttypes.Source( entities=ttypes.EntitySource( @@ -62,9 +64,10 @@ def JoinSource( ) -> ttypes.Source: """ - :param join: - :param query: + :param join: Output of downstream Join operation + :param query: Contains row level transformations and filtering expressed as Spark SQL statements :return: + A source object of kind JoinSource """ return ttypes.Source( joinSource=ttypes.JoinSource( From ed33b976936089fe13362caed87a6046ccea1e21 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 13:57:08 -0400 Subject: [PATCH 4/8] lint --- api/py/ai/chronon/source.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/py/ai/chronon/source.py b/api/py/ai/chronon/source.py index 36e8860274..3f46471dbf 100644 --- a/api/py/ai/chronon/source.py +++ b/api/py/ai/chronon/source.py @@ -14,9 +14,13 @@ def EventSource( """ :param table: Points to a table that has historical data for the input events - :param query: Contains row level transformations and filtering expressed as Spark SQL statements. Applied to both table and topic + :param query: + Contains row level transformations and filtering expressed as Spark SQL statements. + Applied to both table and topic :param topic: (Optional) Kafka topic that can be listened to for realtime updates - :param is_cumulative: Indicates that each new partition contains not just the current day's events but the entire set of events since the beginning + :param is_cumulative: + Indicates that each new partition contains not just the current day's events but the entire set of events + since the beginning :return: A source object of kind EventSource """ From 91ffc9109566cf9e7c2b8af84ec9cd95bf5d475d Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 16:09:11 -0400 Subject: [PATCH 5/8] docs --- docs/examples/main.py | 9 +-- docs/source/Python_API.rst | 5 +- docs/source/authoring_features/Bootstrap.md | 2 +- .../authoring_features/DerivedFeatures.md | 14 ++--- docs/source/authoring_features/GroupBy.md | 59 +++++++++---------- docs/source/authoring_features/Join.md | 29 +++++---- docs/source/authoring_features/Source.md | 45 +++++++------- docs/source/getting_started/Introduction.md | 23 ++++---- docs/source/getting_started/Tutorial.md | 30 +++++----- docs/source/index.rst | 23 ++++---- docs/source/setup/Flink.md | 18 +++--- 11 files changed, 123 insertions(+), 134 deletions(-) diff --git a/docs/examples/main.py b/docs/examples/main.py index 41b5d64fce..e26a348e01 100644 --- a/docs/examples/main.py +++ b/docs/examples/main.py @@ -15,16 +15,17 @@ from ai.chronon import query from ai.chronon.group_by import GroupBy, TimeUnit, Window -from ai.chronon.api.ttypes import EventSource, EntitySource, Aggregation, Operation, JoinPart +from ai.chronon.source import EventSource, EntitySource +from ai.chronon.api.ttypes import Aggregation, Operation, JoinPart from ai.chronon.join import Join ratings_features = GroupBy( sources=[ EntitySource( - snapshotTable="item_info.ratings_snapshots_table", - mutationsTable="item_info.ratings_mutations_table", - mutationsTopic="ratings_mutations_topic", + snapshot_table="item_info.ratings_snapshots_table", + mutations_table="item_info.ratings_mutations_table", + mutations_topic="ratings_mutations_topic", query=query.Query( selects={ "rating": "CAST(rating as DOUBLE)", diff --git a/docs/source/Python_API.rst b/docs/source/Python_API.rst index fa994cdad3..33e02091a1 100644 --- a/docs/source/Python_API.rst +++ b/docs/source/Python_API.rst @@ -4,6 +4,9 @@ Python API .. automodule:: ai.chronon.group_by :members: +.. automodule:: ai.chronon.source + :members: + .. automodule:: ai.chronon.join :members: @@ -11,4 +14,4 @@ Python API :members: .. automodule:: ai.chronon.api.ttypes - :members: StagingQuery, EventSource, EntitySource, Window, Aggregation \ No newline at end of file + :members: StagingQuery, Window, Aggregation \ No newline at end of file diff --git a/docs/source/authoring_features/Bootstrap.md b/docs/source/authoring_features/Bootstrap.md index 5dc3b70515..2ff3898973 100644 --- a/docs/source/authoring_features/Bootstrap.md +++ b/docs/source/authoring_features/Bootstrap.md @@ -58,7 +58,7 @@ Bootstrap table is a precomputed table which contains precomputed feature values ## API w/ Examples -`payments_driver.v1` is a StagingQuery based on the log table of v1 Join, optionally unioned with historical data before logging is available. v1_source is a HiveEventSource based on this StagingQuery, used as `left` of v1 Join. +`payments_driver.v1` is a StagingQuery based on the log table of v1 Join, optionally unioned with historical data before logging is available. v1_source is a EventSource based on this StagingQuery, used as `left` of v1 Join. ```python diff --git a/docs/source/authoring_features/DerivedFeatures.md b/docs/source/authoring_features/DerivedFeatures.md index e8176ea467..07bbda42f4 100644 --- a/docs/source/authoring_features/DerivedFeatures.md +++ b/docs/source/authoring_features/DerivedFeatures.md @@ -61,7 +61,7 @@ from ai.chronon.group_by import ( Window, TimeUnit, ) -from ai.chronon.api.ttypes import EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select transactions_source = EventSource( @@ -98,7 +98,7 @@ File `joins/sample/txn_fraud.py` ```python from group_bys.sample_team import card_features -from ai.chronon.api.ttypes import EventSource +from ai.chronon.source import EventSource from ai.chronon.join import Join, JoinPart, Derivation v1 = Join( @@ -143,15 +143,15 @@ from ai.chronon.group_by import ( Window, TimeUnit, ) -from ai.chronon.api.ttypes import EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select # TODO Add data tests ip_successes_source = EventSource( - table="namespace.ip_successes", - topic="ip_successes_kafka_topic", - query=Query(selects=select("ip", "success")), + table="namespace.ip_successes", + topic="ip_successes_kafka_topic", + query=Query(selects=select("ip", "success")), ) @@ -177,7 +177,7 @@ File `joins/sample/txn_fraud.py` ```python from group_bys.sample_team import merchant_features -from ai.chronon.api.ttypes import EventSource +from ai.chronon.source import EventSource from ai.chronon.join import Join, JoinPart, Derivation v1 = Join( diff --git a/docs/source/authoring_features/GroupBy.md b/docs/source/authoring_features/GroupBy.md index 7f1b7acefe..70f74de65a 100644 --- a/docs/source/authoring_features/GroupBy.md +++ b/docs/source/authoring_features/GroupBy.md @@ -199,14 +199,13 @@ The following examples are broken down by source type. We strongly suggest makin This example is based on the [returns](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py) GroupBy from the quickstart guide that performs various aggregations over the `refund_amt` column over various windows. ```python -source = Source( - events=EventSource( - table="data.returns", # This points to the log table with historical return events - topic="events.returns", - query=Query( - selects=select("user_id","refund_amt"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.returns", # This points to the log table with historical return events + topic="events.returns", + query=Query( + selects=select("user_id","refund_amt"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below @@ -242,13 +241,12 @@ v1 = GroupBy( In this example we take the [Purchases GroupBy](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/purchases.py) from the Quickstart tutorial and modify it to include buckets based on a hypothetical `"credit_card_type"` column. ```python -source = Source( - events=EventSource( - table="data.purchases", - query=Query( - selects=select("user_id","purchase_price","credit_card_type"), # Now we also select the `credit card type` column - time_column="ts") - )) +source = EventSource( + table="data.purchases", + query=Query( + selects=select("user_id","purchase_price","credit_card_type"), # Now we also select the `credit card type` column + time_column="ts") +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] @@ -293,13 +291,12 @@ Important things to note about this case relative to the streaming GroupBy: * As such, we do not need to provide a time column, midnight boundaries are used as the time along which feature values are updated. For example, a 30 day window computed using this GroupBy will get computed as of the prior midnight boundary for a requested timestamp, rather than the precise millisecond, for the purpose of online/offline consistency. ```python -source = Source( - events=EventSource( - table="data.purchases", # This points to the log table with historical purchase events - query=Query( - selects=select("user_id","purchase_price"), # Select the fields we care about - ) - )) +source = EventSource( + table="data.purchases", # This points to the log table with historical purchase events + query=Query( + selects=select("user_id","purchase_price"), # Select the fields we care about + ) +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below @@ -341,13 +338,12 @@ The primary key for this GroupBy is the same as the primary key of the source ta it doesn't perform any aggregation, but just extracts user fields as features. """ -source = Source( - entities=EntitySource( - snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog - query=Query( - selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about - ) - )) +source = EntitySource( + snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog + query=Query( + selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about + ) +) v1 = GroupBy( sources=[source], @@ -364,9 +360,8 @@ This is a modification of the above `Batch Entity GroupBy` example to include an Semantically, we're expressing "count the number of users per zip code" as a feature. ```python -source = Source( - entities=EntitySource( - snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog +source = EntitySource( + snapshot_table="data.users", # This points to a table that contains daily snapshots of the entire product catalog query=Query( selects=select( # Select the fields we care about user_id="CAST (user_id AS BIGINT)", # it supports Spark SQL expressions diff --git a/docs/source/authoring_features/Join.md b/docs/source/authoring_features/Join.md index 767e633510..d5c4d3ccbd 100644 --- a/docs/source/authoring_features/Join.md +++ b/docs/source/authoring_features/Join.md @@ -9,14 +9,13 @@ This is important because it means that when we serve the model online, inferenc To see how we do this, let's take a look at the left side of the join definition (taken from [Quickstart Training Set Join](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/joins/quickstart/training_set.py)). ```python -source = Source( - events=EventSource( - table="data.checkouts", - query=Query( - selects=select("user_id"), # The primary key used to join various GroupBys together - time_column="ts", # The event time used to compute feature values as-of - ) - )) +source = EventSource( + table="data.checkouts", + query=Query( + selects=select("user_id"), # The primary key used to join various GroupBys together + time_column="ts", # The event time used to compute feature values as-of + ) +) v1 = Join( left=source, @@ -292,7 +291,7 @@ v1 = StagingQuery( # ml_models/zipline/joins/team_name/model.py v1 = Join( # driver table can be either output of a staging_query or custom hive table - left=HiveEventSource( + left=EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query(...) @@ -365,7 +364,7 @@ v1 = StagingQuery( # ml_models/zipline/joins/team_name/model.py v1 = Join( # it's important to use the SAME staging query before and after. - left=HiveEventSource( + left=EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query(...) @@ -395,7 +394,7 @@ If indeed something like that happened, or if you must use a different left tabl ```python v2 = Join( # if you must use a different driver table - left=HiveEventSource( + left=EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v2), query=Query(...) @@ -419,7 +418,7 @@ Steps # local variable to support sharing the same config values across two joins right_parts_production = [...] right_parts_experimental = [...] -driver_table = HiveEventSource( +driver_table = EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query(wheres=downsampling_filters) @@ -467,7 +466,7 @@ v1 = StagingQuery( # ml_models/zipline/joins/team_name/model.py # local variable to support sharing the same config values across two joins right_parts = [...] -driver_table = HiveEventSource( +driver_table = EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query(wheres=downsampling_filters) @@ -608,7 +607,7 @@ CHRONON_TO_LEGACY_NAME_MAPPING_DICT = { } v1 = Join( # driver table with union history - left=HiveEventSource( + left=EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query(...) @@ -633,7 +632,7 @@ Goal 3. Register the backfill table as a bootstrap part in the join ```python backfill_2023_05_01 = Join( - left=HiveEventSource( + left=EventSource( namespace="db_name", table=get_staging_query_output_table_name(driver_table.v1), query=Query( diff --git a/docs/source/authoring_features/Source.md b/docs/source/authoring_features/Source.md index 49a6041e86..17c0ada46b 100644 --- a/docs/source/authoring_features/Source.md +++ b/docs/source/authoring_features/Source.md @@ -21,14 +21,13 @@ All sources are basically composed of the following pieces*: Taken from the [returns.py](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/returns.py) example GroupBy in the quickstart tutorial. ```python -source = Source( - events=EventSource( - table="data.returns", # This points to the log table with historical return events - topic="events.returns", # Streaming event - query=Query( - selects=select("user_id","refund_amt"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.returns", # This points to the log table with historical return events + topic="events.returns", # Streaming event + query=Query( + selects=select("user_id","refund_amt"), # Select the fields we care about + time_column="ts") # The event time +) ``` Key points: @@ -43,14 +42,13 @@ Key points: Modified from the above example. ```python -source = Source( - events=EventSource( - table="data.returns", - topic=None, # This makes it a batch source - query=Query( - selects=select("user_id","refund_amt"), - ) - )) +source = EventSource( + table="data.returns", + topic=None, # This makes it a batch source + query=Query( + selects=select("user_id","refund_amt"), + ) +) ``` Key points: @@ -64,7 +62,7 @@ Key points: Here is an example of a streaming EntitySource, modeled after a hypothetical "users" table. ```python -user_activity = Source(entities=EntitySource( +user_activity = EntitySource( snapshotTable="db_snapshots.users", mutationTable="db_mutations.users", mutationTopic="events.users_mutations", @@ -87,13 +85,12 @@ As you can see, a pre-requisite to using the streaming `EntitySource` is a chang Taken from the [users.py](https://github.com/airbnb/chronon/blob/main/api/py/test/sample/group_bys/quickstart/users.py) example GroupBy in the quickstart tutorial. ```python -source = Source( - entities=EntitySource( - snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog - query=Query( - selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about - ) - )) +source = EntitySource( + snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog + query=Query( + selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about + ) +) ``` This is similar to the above, however, it only contains the `snapshotTable`, and not the batch and streaming mutations sources. diff --git a/docs/source/getting_started/Introduction.md b/docs/source/getting_started/Introduction.md index 402c8c19d9..2ba30c2d7c 100644 --- a/docs/source/getting_started/Introduction.md +++ b/docs/source/getting_started/Introduction.md @@ -28,20 +28,19 @@ This GroupBy aggregates metrics about a user's previous purchases in various win """ # This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. -source = Source( - events=EventSource( - table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic= "events/purchases", # The streaming source topic that can be listened to for realtime events - query=Query( - selects=select( - user="user_id", - price="purchase_price * (1 - merchant_fee_percent/100)" - ), # Select the fields we care about - time_column="ts" # The event time - ) - ) +source = EventSource( + table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic= "events/purchases", # The streaming source topic that can be listened to for realtime events + query=Query( + selects=select( + user="user_id", + price="purchase_price * (1 - merchant_fee_percent/100)" + ), # Select the fields we care about + time_column="ts" # The event time + ) ) + window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below v1 = GroupBy( diff --git a/docs/source/getting_started/Tutorial.md b/docs/source/getting_started/Tutorial.md index b5569ab961..8cea29ad5f 100644 --- a/docs/source/getting_started/Tutorial.md +++ b/docs/source/getting_started/Tutorial.md @@ -68,14 +68,13 @@ We can aggregate the purchases log data to the user level, to give us a view int Because this feature is built upon a source that includes both a table and a topic, its features can be computed in both batch and streaming. ```python -source = Source( - events=EventSource( - table="data.purchases", # This points to the log table with historical purchase events - topic=None, # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events - query=Query( - selects=select("user_id","purchase_price"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.purchases", # This points to the log table with historical purchase events + topic=None, # Streaming is not currently part of quickstart, but this would be where you define the topic for realtime events + query=Query( + selects=select("user_id","purchase_price"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below @@ -145,14 +144,13 @@ For our use case, it's very important that features are computed as of the corre Here is what our join looks like: ```python -source = Source( - events=EventSource( - table="data.checkouts", - query=Query( - selects=select("user_id"), # The primary key used to join various GroupBys together - time_column="ts", - ) # The event time used to compute feature values as-of - )) +source = EventSource( + table="data.checkouts", + query=Query( + selects=select("user_id"), # The primary key used to join various GroupBys together + time_column="ts", + ) # The event time used to compute feature values as-of +) v1 = Join( left=source, diff --git a/docs/source/index.rst b/docs/source/index.rst index 1aef696460..9ff3844a66 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -98,20 +98,19 @@ This definition starts with purchase events as the raw input source, and creates """ # This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. - source = Source( - events=EventSource( - table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic= "events/purchases", # The streaming source topic that can be listened to for realtime events - query=Query( - selects=select( - user="user_id", - price="purchase_price * (1 - merchant_fee_percent/100)" - ), # Select the fields we care about - time_column="ts" # The event time - ) - ) + source = EventSource( + table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic= "events/purchases", # The streaming source topic that can be listened to for realtime events + query=Query( + selects=select( + user="user_id", + price="purchase_price * (1 - merchant_fee_percent/100)" + ), # Select the fields we care about + time_column="ts" # The event time + ) ) + window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below v1 = GroupBy( diff --git a/docs/source/setup/Flink.md b/docs/source/setup/Flink.md index 181ecc716a..09c6e1f966 100644 --- a/docs/source/setup/Flink.md +++ b/docs/source/setup/Flink.md @@ -55,16 +55,14 @@ A GroupBy might look like this: ```python ice_cream_group_by = GroupBy( - sources=Source( - events=ttypes.EventSource( - query=Query( - selects=select( - customer_id="customer_id", - flavor="ice_cream_flavor", - ), - time_column="created", - ) - ) + sources=EventSource( + query=Query( + selects=select( + customer_id="customer_id", + flavor="ice_cream_flavor", + ), + time_column="created", + ) ), keys=["customer_id"], aggregations=[ From f754bc080e91d17226fcd315821f9477c0260c5b Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Fri, 18 Jul 2025 16:12:37 -0400 Subject: [PATCH 6/8] more docs --- api/py/ai/chronon/group_by.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/api/py/ai/chronon/group_by.py b/api/py/ai/chronon/group_by.py index 6ddb816d98..65f23e796e 100644 --- a/api/py/ai/chronon/group_by.py +++ b/api/py/ai/chronon/group_by.py @@ -370,24 +370,25 @@ def GroupBy( :param sources: can be constructed as entities or events or joinSource:: - import ai.chronon.api.ttypes as chronon - events = chronon.Source(events=chronon.Events( + import ai.chronon.query as query + import ai.chronon.source as source + events = source.EventSource( table=YOUR_TABLE, topic=YOUR_TOPIC # <- OPTIONAL for serving - query=chronon.Query(...) - isCumulative=False # <- defaults to false. + query=query.Query(...) + is_cumulative=False # <- defaults to false. )) Or - entities = chronon.Source(entities=chronon.Entities( - snapshotTable=YOUR_TABLE, - mutationTopic=YOUR_TOPIC, - mutationTable=YOUR_MUTATION_TABLE - query=chronon.Query(...) + entities = source.EntitySource( + snapshot_table=YOUR_TABLE, + mutation_topic=YOUR_TOPIC, + mutation_table=YOUR_MUTATION_TABLE + query=query.Query(...) )) or - joinSource = chronon.Source(joinSource=chronon.JoinSource( + joinSource = source.JoinSource( join = YOUR_CHRONON_PARENT_JOIN, - query = chronon.Query(...) + query = query.Query(...) )) Multiple sources can be supplied to backfill the historical values with their respective start and end From 94c60d8741d249d01fff8d09f50a414a9008ac38 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Mon, 21 Jul 2025 10:45:29 -0400 Subject: [PATCH 7/8] test signatures match --- api/py/test/test_source.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 api/py/test/test_source.py diff --git a/api/py/test/test_source.py b/api/py/test/test_source.py new file mode 100644 index 0000000000..62d2adc2dd --- /dev/null +++ b/api/py/test/test_source.py @@ -0,0 +1,32 @@ +import inspect +import re +import pytest + +from ai.chronon.api import ttypes +from ai.chronon import source + + +def _camel_to_snake(s: str) -> str: + return re.sub(r'(? Date: Tue, 22 Jul 2025 16:06:56 -0400 Subject: [PATCH 8/8] empty customJson --- api/py/ai/chronon/source.py | 4 ++-- .../sample_team/event_sample_group_by.v1 | 3 +-- .../sample_team/sample_deprecation_group_by.v1 | 6 ++---- .../sample_team/sample_group_by_group_by.v1 | 3 +-- .../sample_chaining_group_by.chaining_group_by_v1 | 6 ++---- .../group_bys/unit_test/sample_group_by.v1 | 3 +-- .../unit_test/user.sample_nested_group_by.v1 | 3 +-- .../sample_team/sample_backfill_mutation_join.v0 | 6 ++---- .../sample_team/sample_chaining_join.parent_join | 9 +++------ .../joins/sample_team/sample_deprecation_join.v1 | 9 +++------ .../sample_team/sample_join.consistency_check | 6 ++---- .../sample_team/sample_join.group_by_of_group_by | 6 ++---- .../joins/sample_team/sample_join.never | 6 ++---- .../sample_team/sample_join.no_log_flattener | 6 ++---- .../production/joins/sample_team/sample_join.v1 | 6 ++---- .../joins/sample_team/sample_join_bootstrap.v1 | 9 +++------ .../joins/sample_team/sample_join_bootstrap.v2 | 15 +++++---------- .../joins/sample_team/sample_join_derivation.v1 | 9 +++------ .../sample_team/sample_join_external_parts.v1 | 6 ++---- .../sample_team/sample_join_external_parts.v2 | 6 ++---- .../sample_join_for_dependency_test.v1 | 6 ++---- .../sample_join_from_group_by_from_join.v1 | 6 ++---- .../joins/sample_team/sample_join_from_module.v1 | 12 ++++-------- .../sample_team/sample_join_from_shorthand.v1 | 3 +-- ...ple_join_with_derivations_on_external_parts.v1 | 9 +++------ .../joins/sample_team/sample_label_join.v1 | 15 +++++---------- .../sample_team/sample_label_join_with_agg.v1 | 15 +++++---------- .../joins/sample_team/sample_online_join.v1 | 15 +++++---------- .../unit_test/sample_parent_join.parent_join | 6 ++---- .../joins/unit_test/user.sample_transactions.v1 | 6 ++---- 30 files changed, 74 insertions(+), 146 deletions(-) diff --git a/api/py/ai/chronon/source.py b/api/py/ai/chronon/source.py index 3f46471dbf..7f5ecc7a30 100644 --- a/api/py/ai/chronon/source.py +++ b/api/py/ai/chronon/source.py @@ -30,7 +30,7 @@ def EventSource( topic=topic, query=query, isCumulative=is_cumulative, - customJson=json.dumps(kwargs), + customJson=json.dumps(kwargs) if kwargs else None, ) ) @@ -57,7 +57,7 @@ def EntitySource( mutationTable=mutation_table, query=query, mutationTopic=mutation_topic, - customJson=json.dumps(kwargs), + customJson=json.dumps(kwargs) if kwargs else None, ) ) diff --git a/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 index 05b44a6c5c..8a697e9525 100644 --- a/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1 @@ -25,8 +25,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 index 195d6fbea4..e52efe250b 100644 --- a/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/sample_deprecation_group_by.v1 @@ -28,8 +28,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -43,8 +42,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 index 5ac77882b9..942d3f0b18 100644 --- a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 b/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 index b22b0e1fb7..1752b48607 100644 --- a/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 +++ b/api/py/test/sample/production/group_bys/unit_test/sample_chaining_group_by.chaining_group_by_v1 @@ -46,8 +46,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -79,8 +78,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 b/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 index 0275bf5da2..473f27f3b1 100644 --- a/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 +++ b/api/py/test/sample/production/group_bys/unit_test/sample_group_by.v1 @@ -25,8 +25,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 b/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 index dd77c50c5b..e3fe60cc41 100644 --- a/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 +++ b/api/py/test/sample/production/group_bys/unit_test/user.sample_nested_group_by.v1 @@ -26,8 +26,7 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 b/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 index b578f92f9b..8ced8e1ca2 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 +++ b/api/py/test/sample/production/joins/sample_team/sample_backfill_mutation_join.v0 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -60,8 +59,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join b/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join index 756d8145b1..f8f7457112 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join +++ b/api/py/test/sample/production/joins/sample_team/sample_chaining_join.parent_join @@ -31,8 +31,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -64,8 +63,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -134,8 +132,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 index 907c395dc3..ad3f737cad 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_deprecation_join.v1 @@ -29,8 +29,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -65,8 +64,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -80,8 +78,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check b/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check index e672fdc473..b1b550d438 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check +++ b/api/py/test/sample/production/joins/sample_team/sample_join.consistency_check @@ -28,8 +28,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -63,8 +62,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by index a12f502914..51df118a8f 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by +++ b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by @@ -28,8 +28,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -65,8 +64,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.never b/api/py/test/sample/production/joins/sample_team/sample_join.never index a9ec6f2f07..dabae091eb 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.never +++ b/api/py/test/sample/production/joins/sample_team/sample_join.never @@ -27,8 +27,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener b/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener index 72c5b8c06a..195a9d381a 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener +++ b/api/py/test/sample/production/joins/sample_team/sample_join.no_log_flattener @@ -27,8 +27,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_join.v1 index dab68d4785..31d96721d9 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join.v1 @@ -32,8 +32,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -67,8 +66,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 index 04e92ea57f..69c04fc065 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1 @@ -31,8 +31,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -64,8 +63,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -134,8 +132,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 index 59a7ce7661..69a129caf5 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v2 @@ -33,8 +33,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -66,8 +65,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -136,8 +134,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -197,8 +194,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -212,8 +208,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 index 610ad06969..cb70e035c9 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_derivation.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -132,8 +130,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 index 120bb193da..ae60db1855 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v1 @@ -27,8 +27,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 index e1eacc8eb6..b083806d88 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_external_parts.v2 @@ -27,8 +27,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 index b5c0a0376f..20f05ea94a 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_for_dependency_test.v1 @@ -27,8 +27,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -60,8 +59,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 index 2404389795..e189a7aa9e 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_group_by_from_join.v1 @@ -28,8 +28,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -65,8 +64,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 index baac6662f3..72ff11b534 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_module.v1 @@ -31,8 +31,7 @@ }, "startPartition": "2021-03-01", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -61,8 +60,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -76,8 +74,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -138,8 +135,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 index 2eee2bbff1..1afce25228 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_from_shorthand.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [] diff --git a/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 b/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 index 6e6dcb236d..60b1f210cf 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -132,8 +130,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 index 7bf55cbfd2..0ee74f427e 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_label_join.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -131,8 +129,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -146,8 +143,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -212,8 +208,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 b/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 index 1586c31818..01474df919 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_label_join_with_agg.v1 @@ -29,8 +29,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -62,8 +61,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -131,8 +129,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -146,8 +143,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -215,8 +211,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 b/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 index 0ed01d24f8..ab60e81e72 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 +++ b/api/py/test/sample/production/joins/sample_team/sample_online_join.v1 @@ -32,8 +32,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -65,8 +64,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -135,8 +133,7 @@ "startPartition": "2021-03-01", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], @@ -196,8 +193,7 @@ "endPartition": "2021-04-09", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, { @@ -211,8 +207,7 @@ "startPartition": "2021-03-01", "timeColumn": "__timestamp", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join b/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join index 3645cb55eb..63553a84be 100644 --- a/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join +++ b/api/py/test/sample/production/joins/unit_test/sample_parent_join.parent_join @@ -27,8 +27,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -56,8 +55,7 @@ "startPartition": "2021-04-09", "timeColumn": "ts", "setups": [] - }, - "customJson": "{}" + } } } ], diff --git a/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 b/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 index 77daf17253..759f529342 100644 --- a/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 +++ b/api/py/test/sample/production/joins/unit_test/user.sample_transactions.v1 @@ -30,8 +30,7 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts_created_at_utc) * 1000", "setups": [] - }, - "customJson": "{}" + } } }, "joinParts": [ @@ -64,8 +63,7 @@ "startPartition": "2023-03-01", "timeColumn": "UNIX_TIMESTAMP(ts) * 1000", "setups": [] - }, - "customJson": "{}" + } } } ],