Skip to content

Write daily IRs for incremental aggregation#998

Open
kambstreat wants to merge 66 commits intoairbnb:mainfrom
kambstreat:kchakka/incremental
Open

Write daily IRs for incremental aggregation#998
kambstreat wants to merge 66 commits intoairbnb:mainfrom
kambstreat:kchakka/incremental

Conversation

@kambstreat
Copy link
Copy Markdown

@kambstreat kambstreat commented Jun 3, 2025

Summary

This PR address the following CHIP : #984

Why / Goal

The goal of the PR is to reduce the time taken to compute aggregations. Added daily level partial aggregates for batch feature computation, which avoids reading past day's event level data.

With this change, we see, 4x improvement for computing features. (except first time run )

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

s"${aggregationPart.inputColumn}_$opSuffix${aggregationPart.window.suffix}${bucketSuffix}"

def incOutputColumnName =
s"${aggregationPart.inputColumn}_$opSuffix${bucketSuffix}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we still keep the aggregationPart.window.suffix? Otherwise, how do we reconstruct the final output column?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengyu-hou not sure I get it. I can not use the window.suffix right as the intermediate incremental is daily aggregation.

.toArray
.zip(columnAggregators.map(_.irType))

val incSchema = aggregationParts
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we should use a full incremental in the code and we can keep inc as the suffix for the table so the table names are not getting too long. What do you think?

Suggested change
val incSchema = aggregationParts
val incrementalSchema = aggregationParts

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. good to use full name.

kambstreat and others added 30 commits November 9, 2025 16:59
Co-authored-by: Pengyu Hou <3771747+pengyu-hou@users.noreply.github.com>
Signed-off-by: chaitanya <1847554+kambstreat@users.noreply.github.com>
….scala

  GroupBy.scala — bug fixes

  - Fix aggregationParts returning empty when no holes exist: Previously derived from the hole-filling loop, so when the incremental table was already up-to-date, convertIncrementalDfToHops received an empty list
  and produced wrong hops. Now derived directly from groupByConf unconditionally.
  - Fix maxWindow.get NPE: Replaced with getOrElse(throw ...) that gives a clear error message when incremental mode is used on a GroupBy with no windowed aggregations.
  - Remove unused imports: Operation and com.google.common.collect.Table.
  - Fix typo: "aggregatiosn" → "aggregations" in scaladoc.

  GroupByIncrementalTest.scala — test improvements

  - Enable UNIQUE_COUNT and BOUNDED_UNIQUE_COUNT: Uncommented both operations in testIncrementalBasicAggregations. Comparison converts array IRs to their sizes (since element ordering/MD5 hashing differs between
  Chronon and SQL) and validates against count(distinct ...) from SQL.
  - Rewrite testIncrementalStatisticalAggregations: Replaced schema-only checks with a full incremental-vs-non-incremental snapshotEvents comparison for SKEW, KURTOSIS, APPROX_PERCENTILE, APPROX_UNIQUE_COUNT, and
  APPROX_HISTOGRAM_K.
  - Remove redundant column-existence assertions: Dropped assertTrue("IR must contain X") checks that were superseded by the value comparison.
  - Remove unused BinaryType import.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants