diff --git a/api/py/ai/chronon/group_by.py b/api/py/ai/chronon/group_by.py index 6ddb816d98..65f23e796e 100644 --- a/api/py/ai/chronon/group_by.py +++ b/api/py/ai/chronon/group_by.py @@ -370,24 +370,25 @@ def GroupBy( :param sources: can be constructed as entities or events or joinSource:: - import ai.chronon.api.ttypes as chronon - events = chronon.Source(events=chronon.Events( + import ai.chronon.query as query + import ai.chronon.source as source + events = source.EventSource( table=YOUR_TABLE, topic=YOUR_TOPIC # <- OPTIONAL for serving - query=chronon.Query(...) - isCumulative=False # <- defaults to false. + query=query.Query(...) + is_cumulative=False # <- defaults to false. )) Or - entities = chronon.Source(entities=chronon.Entities( - snapshotTable=YOUR_TABLE, - mutationTopic=YOUR_TOPIC, - mutationTable=YOUR_MUTATION_TABLE - query=chronon.Query(...) + entities = source.EntitySource( + snapshot_table=YOUR_TABLE, + mutation_topic=YOUR_TOPIC, + mutation_table=YOUR_MUTATION_TABLE + query=query.Query(...) )) or - joinSource = chronon.Source(joinSource=chronon.JoinSource( + joinSource = source.JoinSource( join = YOUR_CHRONON_PARENT_JOIN, - query = chronon.Query(...) + query = query.Query(...) )) Multiple sources can be supplied to backfill the historical values with their respective start and end diff --git a/api/py/ai/chronon/source.py b/api/py/ai/chronon/source.py new file mode 100644 index 0000000000..7f5ecc7a30 --- /dev/null +++ b/api/py/ai/chronon/source.py @@ -0,0 +1,81 @@ +import json +from typing import Optional + +import ai.chronon.api.ttypes as ttypes + + +def EventSource( + table: str, + query: ttypes.Query, + topic: Optional[str] = None, + is_cumulative: Optional[bool] = None, + **kwargs +) -> ttypes.Source: + """ + + :param table: Points to a table that has historical data for the input events + :param query: + Contains row level transformations and filtering expressed as Spark SQL statements. + Applied to both table and topic + :param topic: (Optional) Kafka topic that can be listened to for realtime updates + :param is_cumulative: + Indicates that each new partition contains not just the current day's events but the entire set of events + since the beginning + :return: + A source object of kind EventSource + """ + return ttypes.Source( + events=ttypes.EventSource( + table=table, + topic=topic, + query=query, + isCumulative=is_cumulative, + customJson=json.dumps(kwargs) if kwargs else None, + ) + ) + + +def EntitySource( + snapshot_table: str, + query: ttypes.Query, + mutation_table: Optional[str] = None, + mutation_topic: Optional[str] = None, + **kwargs +) -> ttypes.Source: + """ + + :param snapshot_table: Points to a table that contains periodical snapshots of the entire dataset + :param query: Contains row level transformations and filtering expressed as Spark SQL statements + :param mutation_table: (Optional) Points to a table that contains all changes applied to the dataset + :param mutation_topic: (Optional) Kafka topic that delivers changes in realtime + :return: + A source object of kind EntitySource + """ + return ttypes.Source( + entities=ttypes.EntitySource( + snapshotTable=snapshot_table, + mutationTable=mutation_table, + query=query, + mutationTopic=mutation_topic, + customJson=json.dumps(kwargs) if kwargs else None, + ) + ) + + +def JoinSource( + join: ttypes.Join, + query: ttypes.Query, +) -> ttypes.Source: + """ + + :param join: Output of downstream Join operation + :param query: Contains row level transformations and filtering expressed as Spark SQL statements + :return: + A source object of kind JoinSource + """ + return ttypes.Source( + joinSource=ttypes.JoinSource( + join=join, + query=query + ) + ) diff --git a/api/py/test/lineage/test_parse_group_by.py b/api/py/test/lineage/test_parse_group_by.py index d604ac94f9..6d5070c70d 100644 --- a/api/py/test/lineage/test_parse_group_by.py +++ b/api/py/test/lineage/test_parse_group_by.py @@ -16,6 +16,7 @@ import unittest from ai.chronon import group_by +from ai.chronon import source from ai.chronon.api import ttypes from ai.chronon.group_by import Accuracy, Derivation from ai.chronon.lineage.lineage_metadata import ConfigType, TableType @@ -25,7 +26,7 @@ class TestParseGroupBy(unittest.TestCase): def setUp(self): - gb_event_source = ttypes.EventSource( + gb_event_source = source.EventSource( table="source.gb_table", topic=None, query=ttypes.Query( @@ -35,7 +36,7 @@ def setUp(self): ), ) - gb_event_source1 = ttypes.EventSource( + gb_event_source1 = source.EventSource( table="source.gb_table1", topic=None, query=ttypes.Query( diff --git a/api/py/test/lineage/test_parse_join.py b/api/py/test/lineage/test_parse_join.py index ed8343532e..2aed6943c5 100644 --- a/api/py/test/lineage/test_parse_join.py +++ b/api/py/test/lineage/test_parse_join.py @@ -16,6 +16,7 @@ import unittest from ai.chronon import group_by +from ai.chronon import source from ai.chronon.api import ttypes from ai.chronon.api import ttypes as api from ai.chronon.group_by import Derivation @@ -27,7 +28,7 @@ class TestParseJoin(unittest.TestCase): def setUp(self): - gb_event_source = ttypes.EventSource( + gb_event_source = source.EventSource( table="gb_table", topic=None, query=ttypes.Query( @@ -59,17 +60,15 @@ def setUp(self): ) self.join = Join( - left=api.Source( - events=api.EventSource( - table="join_event_table", - query=api.Query( - startPartition="2020-04-09", - selects={ - "subject": "subject", - "event_id": "event", - }, - timeColumn="CAST(ts AS DOUBLE)", - ), + left=source.EventSource( + table="join_event_table", + query=api.Query( + startPartition="2020-04-09", + selects={ + "subject": "subject", + "event_id": "event", + }, + timeColumn="CAST(ts AS DOUBLE)", ), ), output_namespace="test_db", diff --git a/api/py/test/sample/group_bys/kaggle/clicks.py b/api/py/test/sample/group_bys/kaggle/clicks.py index fd8c3f3b21..6a4e94aca1 100644 --- a/api/py/test/sample/group_bys/kaggle/clicks.py +++ b/api/py/test/sample/group_bys/kaggle/clicks.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, @@ -23,6 +22,7 @@ TimeUnit, Accuracy ) +from ai.chronon.source import EventSource from ai.chronon.utils import get_staging_query_output_table_name from staging_queries.kaggle.outbrain import base_table @@ -43,14 +43,13 @@ """ -source = Source( - events=EventSource( - table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table - topic="some_topic", # You would set your streaming source topic here - query=Query( - selects=select("ad_id", "clicked"), - time_column="ts") - )) +source = EventSource( + table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table + topic="some_topic", # You would set your streaming source topic here + query=Query( + selects=select("ad_id", "clicked"), + time_column="ts") +) ad_streaming = GroupBy( sources=[source], diff --git a/api/py/test/sample/group_bys/kaggle/outbrain.py b/api/py/test/sample/group_bys/kaggle/outbrain.py index eacf3c7ae1..249c2f1534 100644 --- a/api/py/test/sample/group_bys/kaggle/outbrain.py +++ b/api/py/test/sample/group_bys/kaggle/outbrain.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, diff --git a/api/py/test/sample/group_bys/quickstart/purchases.py b/api/py/test/sample/group_bys/quickstart/purchases.py index 166a6398bd..e2bddf54ca 100644 --- a/api/py/test/sample/group_bys/quickstart/purchases.py +++ b/api/py/test/sample/group_bys/quickstart/purchases.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from ai.chronon.group_by import ( GroupBy, Aggregation, @@ -28,14 +28,13 @@ """ # This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. -source = Source( - events=EventSource( - table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events - query=Query( - selects=select("user_id","purchase_price"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events + query=Query( + selects=select("user_id","purchase_price"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below diff --git a/api/py/test/sample/group_bys/quickstart/returns.py b/api/py/test/sample/group_bys/quickstart/returns.py index a7c97ce710..5bc072d758 100644 --- a/api/py/test/sample/group_bys/quickstart/returns.py +++ b/api/py/test/sample/group_bys/quickstart/returns.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from ai.chronon.group_by import ( GroupBy, Aggregation, @@ -28,14 +28,13 @@ This GroupBy aggregates metrics about a user's previous purchases in various windows. """ -source = Source( - events=EventSource( - table="data.returns", # This points to the log table with historical return events - topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092", - query=Query( - selects=select("user_id","refund_amt"), # Select the fields we care about - time_column="ts") # The event time - )) +source = EventSource( + table="data.returns", # This points to the log table with historical return events + topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092", + query=Query( + selects=select("user_id","refund_amt"), # Select the fields we care about + time_column="ts") # The event time +) window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below diff --git a/api/py/test/sample/group_bys/quickstart/schema.py b/api/py/test/sample/group_bys/quickstart/schema.py index 3cc2155178..3665668226 100644 --- a/api/py/test/sample/group_bys/quickstart/schema.py +++ b/api/py/test/sample/group_bys/quickstart/schema.py @@ -1,20 +1,18 @@ from ai.chronon.group_by import GroupBy, Aggregation, Operation -from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select -logging_schema_source = Source( - events=EventSource( - table="default.chronon_log_table", - query=Query( - selects=select( - schema_hash="decode(unbase64(key_base64), 'utf-8')", - schema_value="decode(unbase64(value_base64), 'utf-8')" - ), - wheres=["name='SCHEMA_PUBLISH_EVENT'"], - time_column="ts_millis", +logging_schema_source = EventSource( + table="default.chronon_log_table", + query=Query( + selects=select( + schema_hash="decode(unbase64(key_base64), 'utf-8')", + schema_value="decode(unbase64(value_base64), 'utf-8')" ), - ) + wheres=["name='SCHEMA_PUBLISH_EVENT'"], + time_column="ts_millis", + ), ) v1 = GroupBy( diff --git a/api/py/test/sample/group_bys/quickstart/users.py b/api/py/test/sample/group_bys/quickstart/users.py index 4c4025054e..3e6d543167 100644 --- a/api/py/test/sample/group_bys/quickstart/users.py +++ b/api/py/test/sample/group_bys/quickstart/users.py @@ -13,24 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EntitySource from ai.chronon.query import Query, select from ai.chronon.group_by import ( GroupBy, ) +from ai.chronon.source import EntitySource """ The primary key for this GroupBy is the same as the primary key of the source table. Therefore, it doesn't perform any aggregation, but just extracts user fields as features. """ -source = Source( - entities=EntitySource( - snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog - query=Query( - selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about - ) - )) +source = EntitySource( + snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog + query=Query( + selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about + ) +) v1 = GroupBy( sources=[source], diff --git a/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py b/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py index 2e4f8235ee..e72b9d0bdf 100644 --- a/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py +++ b/api/py/test/sample/group_bys/unit_test/user/sample_nested_group_by.py @@ -1,21 +1,19 @@ -from ai.chronon.api import ttypes from ai.chronon.api.ttypes import Aggregation, Operation, TimeUnit, Window from ai.chronon.group_by import Aggregations, GroupBy from ai.chronon.query import Query, select +from ai.chronon.source import EventSource -source = ttypes.Source( - events=ttypes.EventSource( - table="random_table_name", - query=Query( - selects=select( - user="id_item", - play="if(transaction_type='A', 1, 0)", - pause="if(transaction_type='B', 1, 0)", - ), - start_partition="2023-03-01", - time_column="UNIX_TIMESTAMP(ts) * 1000", +source = EventSource( + table="random_table_name", + query=Query( + selects=select( + user="id_item", + play="if(transaction_type='A', 1, 0)", + pause="if(transaction_type='B', 1, 0)", ), - ) + start_partition="2023-03-01", + time_column="UNIX_TIMESTAMP(ts) * 1000", + ), ) windows = [ diff --git a/api/py/test/sample/joins/quickstart/training_set.py b/api/py/test/sample/joins/quickstart/training_set.py index f0127ddf58..c5170db475 100644 --- a/api/py/test/sample/joins/quickstart/training_set.py +++ b/api/py/test/sample/joins/quickstart/training_set.py @@ -14,8 +14,8 @@ # limitations under the License. from ai.chronon.join import Join, JoinPart -from ai.chronon.api.ttypes import Source, EventSource from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from group_bys.quickstart.purchases import v1 as purchases_v1 from group_bys.quickstart.returns import v1 as returns_v1 @@ -25,14 +25,13 @@ This is the "left side" of the join that will comprise our training set. It is responsible for providing the primary keys and timestamps for which features will be computed. """ -source = Source( - events=EventSource( - table="data.checkouts", - query=Query( - selects=select("user_id"), # The primary key used to join various GroupBys together - time_column="ts", - ) # The event time used to compute feature values as-of - )) +source = EventSource( + table="data.checkouts", + query=Query( + selects=select("user_id"), # The primary key used to join various GroupBys together + time_column="ts", + ) # The event time used to compute feature values as-of +) v1 = Join( left=source, diff --git a/api/py/test/sample/joins/unit_test/user/sample_transactions.py b/api/py/test/sample/joins/unit_test/user/sample_transactions.py index 6f1f046df1..4486bf2b54 100644 --- a/api/py/test/sample/joins/unit_test/user/sample_transactions.py +++ b/api/py/test/sample/joins/unit_test/user/sample_transactions.py @@ -1,21 +1,20 @@ # from airbnb.data_sources_2 import HiveEventSource -from ai.chronon.api import ttypes from ai.chronon.join import Join, JoinPart from ai.chronon.query import Query, select +from ai.chronon.source import EventSource from group_bys.unit_test.user.sample_nested_group_by import v1 as nested_v1 -source = ttypes.Source( - events=ttypes.EventSource( - table="item_snapshot", - query=Query( - selects=select(user="id_requester"), - wheres=["dim_requester_user_role = 'user'"], - start_partition="2023-03-01", - time_column="UNIX_TIMESTAMP(ts_created_at_utc) * 1000", - ), - ) +source = EventSource( + table="item_snapshot", + query=Query( + selects=select(user="id_requester"), + wheres=["dim_requester_user_role = 'user'"], + start_partition="2023-03-01", + time_column="UNIX_TIMESTAMP(ts_created_at_utc) * 1000", + ), ) + v1 = Join( left=source, right_parts=[JoinPart(group_by=nested_v1)], diff --git a/api/py/test/sample/sources/kaggle/outbrain.py b/api/py/test/sample/sources/kaggle/outbrain.py index dc370f0021..db0c426650 100644 --- a/api/py/test/sample/sources/kaggle/outbrain.py +++ b/api/py/test/sample/sources/kaggle/outbrain.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.source import EventSource from ai.chronon.query import Query, select from ai.chronon.utils import get_staging_query_output_table_name from staging_queries.kaggle.outbrain import base_table @@ -29,10 +29,10 @@ def outbrain_left_events(*columns): """ Defines a source based off of the output table of the `base_table` StagingQuery. """ - return Source(events=EventSource( + return EventSource( table=get_staging_query_output_table_name(base_table), query=Query( selects=select(*columns), time_column="ts", ), - )) + ) diff --git a/api/py/test/sample/sources/test_sources.py b/api/py/test/sample/sources/test_sources.py index 4198c9a938..2e5c848811 100644 --- a/api/py/test/sample/sources/test_sources.py +++ b/api/py/test/sample/sources/test_sources.py @@ -18,13 +18,13 @@ select, ) from ai.chronon.utils import get_staging_query_output_table_name -from ai.chronon.api import ttypes +from ai.chronon.source import EventSource, EntitySource from staging_queries.sample_team import sample_staging_query def basic_event_source(table): - return ttypes.Source(events=ttypes.EventSource( + return EventSource( table=table, query=Query( selects=select( @@ -34,11 +34,11 @@ def basic_event_source(table): start_partition="2021-04-09", time_column="ts", ), - )) + ) # Sample Event Source used in tests. -event_source = ttypes.Source(events=ttypes.EventSource( +event_source = EventSource( table="sample_namespace.sample_table_group_by", query=Query( selects=select( @@ -48,15 +48,15 @@ def basic_event_source(table): start_partition="2021-04-09", time_column="ts", ), -)) +) # Sample Entity Source -entity_source = ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_table.sample_entity_snapshot", +entity_source = EntitySource( + snapshot_table="sample_table.sample_entity_snapshot", # hr partition is not necessary - just to demo that we support various # partitioning schemes - mutationTable="sample_table.sample_entity_mutations/hr=00:00", - mutationTopic="sample_topic", + mutation_table="sample_table.sample_entity_mutations/hr=00:00", + mutation_topic="sample_topic", query=Query( start_partition='2021-03-01', selects=select( @@ -65,10 +65,10 @@ def basic_event_source(table): ), time_column="ts" ), -)) +) -batch_entity_source = ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_table.sample_entity_snapshot", +batch_entity_source = EntitySource( + snapshot_table="sample_table.sample_entity_snapshot", query=Query( start_partition='2021-03-01', selects=select( @@ -77,11 +77,11 @@ def basic_event_source(table): ), time_column="ts" ), -)) +) # Sample Entity Source derived from a staging query. -staging_entities=ttypes.Source(entities=ttypes.EntitySource( - snapshotTable="sample_namespace.{}".format(get_staging_query_output_table_name(sample_staging_query.v1)), +staging_entities=EntitySource( + snapshot_table="sample_namespace.{}".format(get_staging_query_output_table_name(sample_staging_query.v1)), query=Query( start_partition='2021-03-01', selects=select(**{ @@ -91,11 +91,11 @@ def basic_event_source(table): 'place_id': 'place_id' }) ) -)) +) # A Source that was deprecated but still relevant (requires stitching). -events_until_20210409 = ttypes.Source(events=ttypes.EventSource( +events_until_20210409 = EventSource( table="sample_namespace.sample_table_group_by", query=Query( start_partition='2021-03-01', @@ -106,10 +106,10 @@ def basic_event_source(table): }), time_column="UNIX_TIMESTAMP(ts) * 1000" ), -)) +) # The new source -events_after_20210409 = ttypes.Source(events=ttypes.EventSource( +events_after_20210409 = EventSource( table="sample_namespace.another_sample_table_group_by", query=Query( start_partition='2021-03-01', @@ -119,4 +119,4 @@ def basic_event_source(table): }), time_column="__timestamp" ), -)) +) diff --git a/api/py/test/test_group_by.py b/api/py/test/test_group_by.py index 3464a86a86..d6af9a1ddd 100644 --- a/api/py/test/test_group_by.py +++ b/api/py/test/test_group_by.py @@ -18,7 +18,8 @@ from ai.chronon import group_by, query from ai.chronon.group_by import GroupBy, Derivation, TimeUnit, Window, Aggregation, Accuracy from ai.chronon.api import ttypes -from ai.chronon.api.ttypes import EventSource, EntitySource, Operation +from ai.chronon.api.ttypes import Operation +from ai.chronon.source import EventSource, EntitySource @pytest.fixture @@ -45,7 +46,7 @@ def event_source(table, topic=None): """ Sample left join """ - return ttypes.EventSource( + return EventSource( table=table, topic=topic, query=ttypes.Query( @@ -64,9 +65,9 @@ def entity_source(snapshotTable, mutationTable): """ Sample source """ - return ttypes.EntitySource( - snapshotTable=snapshotTable, - mutationTable=mutationTable, + return EntitySource( + snapshot_table=snapshotTable, + mutation_table=mutationTable, query=ttypes.Query( startPartition="2020-04-09", selects={ @@ -201,15 +202,15 @@ def test_generic_collector(): def test_select_sanitization(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( # No selects are spcified + EventSource( # No selects are spcified table="event_table1", query=query.Query( selects=None, time_column="ts" ) ), - ttypes.EntitySource( # Some selects are specified - snapshotTable="entity_table1", + EntitySource( # Some selects are specified + snapshot_table="entity_table1", query=query.Query( selects={ "key1": "key1_sql", @@ -236,8 +237,8 @@ def test_snapshot_with_hour_aggregation(): with pytest.raises(AssertionError): group_by.GroupBy( sources=[ - ttypes.EntitySource( # Some selects are specified - snapshotTable="entity_table1", + EntitySource( # Some selects are specified + snapshot_table="entity_table1", query=query.Query( selects={ "key1": "key1_sql", @@ -260,7 +261,7 @@ def test_snapshot_with_hour_aggregation(): def test_additional_metadata(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( + EventSource( table="event_table1", query=query.Query( selects=None, @@ -279,7 +280,7 @@ def test_additional_metadata(): def test_group_by_with_description(): gb = group_by.GroupBy( sources=[ - ttypes.EventSource( + EventSource( table="event_table1", query=query.Query( selects=None, diff --git a/api/py/test/test_source.py b/api/py/test/test_source.py new file mode 100644 index 0000000000..62d2adc2dd --- /dev/null +++ b/api/py/test/test_source.py @@ -0,0 +1,32 @@ +import inspect +import re +import pytest + +from ai.chronon.api import ttypes +from ai.chronon import source + + +def _camel_to_snake(s: str) -> str: + return re.sub(r'(?