Switch average implementation to utilize running average to prevent overflow#1066
Switch average implementation to utilize running average to prevent overflow#1066rgruener wants to merge 3 commits intoairbnb:mainfrom
Conversation
fdbde1e to
89e4735
Compare
aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala
Outdated
Show resolved
Hide resolved
| StructType( | ||
| "AvgIr", | ||
| Array(StructField("sum", DoubleType), StructField("count", IntType)) | ||
| Array(StructField("running_average", DoubleType), StructField("count", IntType)) |
There was a problem hiding this comment.
unfortunately, this might break pipelines that are already in prod.
best way to deal with this is to add a new aggregation and leave this as is.
There was a problem hiding this comment.
The concern is that changing the implementation would introduce skew?
As a minimal fix, changing count to a Long would be helpful
There was a problem hiding this comment.
Though I do think adding another implementation is warranted to unblock certain use cases
There was a problem hiding this comment.
there is avro encoded data that is sitting in kvStore in the old format. the new aggregation logic will probably fail to parse it.
There was a problem hiding this comment.
echo Nikhil's comment.
There was a problem hiding this comment.
I see, after a bit of further investigation I believe I will add this under RunningAverage since I believe we will hit overflow issues (especially with count being an INT)
89e4735 to
daa4ae3
Compare
daa4ae3 to
bab464a
Compare
| override def isDeletable: Boolean = true | ||
| } | ||
|
|
||
| class RunningAverage extends SimpleAggregator[Double, Array[Any], Double] { |
There was a problem hiding this comment.
Instead of a new operator would it make sense to add an argument prevent_overflow or running_average to the Average operator that defaults to false?
There was a problem hiding this comment.
yeah i like that! defaults to None that gets interpreted as false - to keep the semantic hashes as they were
bab464a to
0e78450
Compare
|
Will update docs assuming the change looks ok |
| override def isDeletable: Boolean = true | ||
| } | ||
|
|
||
| class RunningAverage extends SimpleAggregator[Double, Array[Any], Double] { |
There was a problem hiding this comment.
yeah i like that! defaults to None that gets interpreted as false - to keep the semantic hashes as they were
| * Uses a more stable online algorithm which should be suitable for large numbers of records similar to: | ||
| * http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm | ||
| */ | ||
| private def computeRunningAverage(ir: Array[Any], right: Double, rightWeight: Double): Array[Any] = { |
There was a problem hiding this comment.
there is apparently already a getCombinedMean below in the moments stuff
| val scaling = rightWeight / newCount | ||
| if (scaling < STABILITY_CONSTANT) { | ||
| left + (right - left) * scaling | ||
| } else { | ||
| (leftWeight * left + rightWeight * right) / newCount | ||
| } |
There was a problem hiding this comment.
I tried to do this in-place (replace logic in average directly). and it actually makes the tests fail - the average operation stops being commutative due to slight errors in the double multiply and double division. I also tried the (lw*la + rw*ra) / (lw + rw) - without luck.
we ended up merging the following change instead: zipline-ai/chronon#1292
nikhil-zlai
left a comment
There was a problem hiding this comment.
approving to negate my request changes. given the loss of commutativity with the running average computation (due to double multiple / divide errors) - i don't know what is the right thing to do :-/
we ended up just changing the denominator to long in our fork.
I understand how this isnt strictly commutative (I hit that issue in the tests originally and introduced tolerance to the tests to get them to pass). Are there larger implications with that? |
Summary
Currently the average implementation uses a sum / count. For large aggregations this can cause an overflow of sum. This changes the implementation to utilize a running average that will not overflow.
Why / Goal
We utilize large (global) aggregations when doing tensor computations. This guarantees average will work even with these larger aggregations
Test Plan
Reviewers