Fix duplicate attribute keys in transform_attributes#2423
Fix duplicate attribute keys in transform_attributes#2423gyanranjanpanda wants to merge 1 commit intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2423 +/- ##
==========================================
- Coverage 88.18% 88.16% -0.02%
==========================================
Files 604 604
Lines 214589 214767 +178
==========================================
+ Hits 189232 189354 +122
- Misses 24831 24887 +56
Partials 526 526
🚀 New features to boost your workflow:
|
a210873 to
361e6bd
Compare
|
@albertlockett and @ThomsonTan waiting for your feed backl |
albertlockett
left a comment
There was a problem hiding this comment.
Hey @gyanranjanpanda . I appreciate you taking the time to look at this, but I don't think we can accept this PR as is.
Unfortunately, the benchmarks we have for this code on main are currently broken. But when I apply the fix from #2426 and run the benchmark we see that this change introduces significant performance regression:
transform_attributes_dict_keys/single_replace_no_deletes/keys=32,rows=128,rows_per_key=4
time: [5.1300 µs 5.1348 µs 5.1394 µs]
change: [+1027.4% +1031.5% +1035.2%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_single_delete/keys=32,rows=128,rows_per_key=4
time: [5.5027 µs 5.5091 µs 5.5155 µs]
change: [+495.01% +497.37% +499.48%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/no_replace_single_delete/keys=32,rows=128,rows_per_key=4
time: [5.3440 µs 5.3584 µs 5.3746 µs]
change: [+577.41% +580.27% +583.40%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_no_deletes/keys=32,rows=1536,rows_per_key=48
time: [34.015 µs 34.050 µs 34.086 µs]
change: [+4000.2% +4016.4% +4031.3%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_single_delete/keys=32,rows=1536,rows_per_key=48
time: [34.390 µs 34.472 µs 34.562 µs]
change: [+1421.9% +1433.5% +1443.9%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/no_replace_single_delete/keys=32,rows=1536,rows_per_key=48
time: [34.302 µs 34.340 µs 34.379 µs]
change: [+1562.1% +1568.0% +1573.6%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_no_deletes/keys=32,rows=8192,rows_per_key=256
time: [171.62 µs 171.78 µs 171.96 µs]
change: [+6262.2% +6290.6% +6316.2%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_single_delete/keys=32,rows=8192,rows_per_key=256
time: [171.79 µs 171.92 µs 172.06 µs]
change: [+1771.2% +1835.7% +1893.0%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/no_replace_single_delete/keys=32,rows=8192,rows_per_key=256
time: [171.20 µs 171.35 µs 171.49 µs]
change: [+1962.8% +1981.5% +1998.1%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_no_deletes/keys=128,rows=128,rows_per_key=1
time: [4.9566 µs 4.9693 µs 4.9819 µs]
change: [+587.52% +592.02% +597.47%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/single_replace_single_delete/keys=128,rows=128,rows_per_key=1
time: [5.6185 µs 5.6284 µs 5.6377 µs]
change: [+292.54% +294.19% +296.01%] (p = 0.00 < 0.05)
Performance has regressed.
transform_attributes_dict_keys/no_replace_single_delete/keys=128,rows=128,rows_per_key=1
time: [5.2733 µs 5.2831 µs 5.2938 µs]
change: [+385.50% +387.73% +389.92%] (p = 0.00 < 0.05)
Performance has regressed.
While I expect to see some performance regression because we're doing extra work, I feel that such a serious regression in performance warrants some additional investigation into if/how we can do this in a more efficient way.
Please see my comment here which prescribes an approach that I believe will be more performant than what is currently in this PR: #1650 (comment)
361e6bd to
06392eb
Compare
|
thanks for your wonderful guidance i will make sure i could match your expectation |
|
Hey @gyanranjanpanda I wanted to give you a heads up that I am going to be working on #2014 and there may be some significant changes to the transform_attributes code. I will be touching code in transform_keys as well as transform_attributes_impl. I wanted to give you a heads up in case you want to hold off advancing your work until you can better understand the conflicts |
|
Thanks for the heads up! I’ll keep an eye on your changes to #2014 and try to align my work accordingly. If possible, could you share which parts might be most affected so I can avoid overlap? or should i wait after u finished your work i should continue this work |
It's probably easiest to hold off until I finish to avoid conflicts, but I'll leave it up to you. I think I should have the changes I need to make for #2014 done by early next week, if not sooner. For now, I'll show you the in-progress changes: I was imagining that for #1650 you'd need to make changes to |
|
@gyanranjanpanda the changes I mentioned that could cause conflicts have now been merged (see #2442) |
|
i will fix this code as soon as possible while looking your merged pr |
2d813af to
67e366e
Compare
|
@albertlockett Thanks for the detailed benchmark feedback! I have completely reworked the approach based on your guidance. What changed:
Benchmark results (no regression):
The plan-based approach avoids the expensive |
…metry#1650) When renaming attribute key 'x' to 'y', any existing row with key 'y' sharing a parent_id with a row having key 'x' would produce a duplicate. This commit fixes that by: - Adding find_rename_collisions_to_delete_ranges() which uses IdBitmap to efficiently detect these collisions in O(N) time - Generating KeyTransformRange::Delete entries that are merged into the existing transform pipeline in transform_keys() and transform_dictionary_keys() - Fixing an early-return in transform_dictionary_keys() that skipped row-level collision deletes when dictionary values had no deletions - Adding read_parent_ids_as_u32() helper for parent_id column access - Adding test_rename_removes_duplicate_keys integration test Only runs collision detection when parent_ids are plain-encoded (not transport-optimized) to avoid incorrect results from quasi-delta encoded values. Closes open-telemetry#1650
67e366e to
71f2ee6
Compare
albertlockett
left a comment
There was a problem hiding this comment.
Looks like some good progress, but still some things happening that are not as well optimized as they could be
| .target_keys[transform_range.idx] | ||
| .len(); | ||
| let deleted_val_len = if let Some(ref dp) = delete_plan { | ||
| if transform_range.idx < dp.target_keys.len() { |
There was a problem hiding this comment.
We're setting idx to 0 here:
otel-arrow/rust/otap-dataflow/crates/pdata/src/otap/transform.rs
Lines 1931 to 1944 in 71f2ee6
So won't transform_range.idx always be less than dp.target_keys.len() if there are any deletes?
We should probably have a test that does delete and rename at the same time to cover this kind of issue.
There was a problem hiding this comment.
Maybe the solution here would be to make idx an Option
| let parent_ids_vec = attrs_record_batch | ||
| .column_by_name(consts::PARENT_ID) | ||
| .map(read_parent_ids_as_u32) | ||
| .transpose()?; | ||
| if let Some(parent_ids) = parent_ids_vec.as_ref() { |
There was a problem hiding this comment.
parent_id column should always be present for attributes. If it's not present that would be an error (similar to how you've returned an error below for the attributes key). See the spec has this column as required: https://github.com/open-telemetry/otel-arrow/blob/main/docs/otap-spec.md#542-u16-attributes
Also, we are copying the ID column into a Vec<u32> here, only to pass it into find_rename_collisions_to_delete ranges simply so the IDs can be iterated. This kind of copying is wasteful
IMO we could probably just make find_rename_collisions_to_delete generic over <K: ArrowPrimitiveType> and have it accept parent_ids as a &PrimitiveArray<K>. If the parent_id column is a PrimitiveArray, we pass that directly. If it's a dictionary, we can actually just pass the keys and the logic we're using to look for which rows would be duplicated after rename should still work.
| if let Some(k) = key_accessor.str_at(i) { | ||
| if k == old_key.as_str() { | ||
| source_parents.insert(pid); | ||
| } | ||
| } |
There was a problem hiding this comment.
The way we're checking the strings here could be optimized. Since attribute keys will most often be dictionary encoded, what will happen here is that this will effectively pull the string from the dict and compare it for every parent ID. It would actually be more optimal to do the string comparison one time on the dictionary values.
All this to say that we should probably be using the arrow eq kernel on the string up front to create a mask of which rows contain this key https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html and check if the mask is true at this index.
There was a problem hiding this comment.
For example if you passed keys directly as an ArrayRef, you could so something similar to this:
// compare using optimized kernel
let keys = eq(keys, &StringArray::new_scalar(old_key))?;
// for each range that has equal keys
for (start, end) in BitSliceIterator::new(&keys.values().inner(), 0, keys.len()) {
// push IDs into source_parents
for pid in parent_ids[start..end] {
source_parents.insert(pid)
}
}I say "something similar" because I also think we may want to pass something other than a Vec for parent IDs. (see my other comment)
| if let Some(k) = key_accessor.str_at(i) { | ||
| if k == new_key.as_str() && source_parents.contains(pid) { |
There was a problem hiding this comment.
Similar comment to the one above related to using eq compute kernel to compare the key value instead of calling str_at and == for every row.
| // Convert per-row booleans into contiguous KeyTransformRange::Delete entries | ||
| let mut ranges = Vec::new(); | ||
| let mut start = None; | ||
| for (i, &should_delete) in collision_delete_rows.iter().enumerate() { |
There was a problem hiding this comment.
We're materializing a vector with a flag at each position indicating that the row is deleted, only to throw it away when we iterate it to convert to ranges here. Wouldn't it be more optimal to just compute the vec of ranges directly in the code above?
| for (old_key, new_key) in &rename.map { | ||
| // Build IdBitmap of parent_ids that have the source key (old_key). | ||
| let mut source_parents = IdBitmap::new(); |
There was a problem hiding this comment.
The heap allocation of ID bitmap can be reused for each rename:
| for (old_key, new_key) in &rename.map { | |
| // Build IdBitmap of parent_ids that have the source key (old_key). | |
| let mut source_parents = IdBitmap::new(); | |
| let mut source_parents = IdBitmap::new(); | |
| for (old_key, new_key) in &rename.map { | |
| source_parents.clear(); |
| // Only when parent_ids are plain-encoded (not transport-optimized) — quasi-delta | ||
| // encoded values don't represent actual parent IDs. |
There was a problem hiding this comment.
This is not true. The delta encoded IDs do represent actual parent_ids, and we can't just ignore this logic if we find delta encoded parent IDs.
Probably the most optimal thing to do is actually check if any rows have an attribute with the new key name. If this happens, we'll need to remove the delta delta encoding so we can figure out if a row with the same parent_id also has the old key name, which would indicate a collision after rename.
I realize this complicates your algorithm, because clearly both those key checks are happening inside the same helper find_rename_collisions_to_delete_ranges, so in order to get this to work as explained you may need to refactor a bit.
| let mut transform_ranges = | ||
| merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()).into_owned(); | ||
|
|
||
| // Merge collision delete ranges into the transform plan | ||
| if !collision_delete_ranges.is_empty() { | ||
| transform_ranges.extend_from_slice(collision_delete_ranges); | ||
| transform_ranges.sort_by_key(|r| r.start()); | ||
| } | ||
| let transform_ranges: Cow<'_, [KeyTransformRange]> = Cow::Owned(transform_ranges); |
There was a problem hiding this comment.
I don't feel that we should convert this to a Vec, then re-sort it, then convert it back to a Cow. The whole point of it being a Cow in the first place was to avoid a heap allocation.
Maybe we could find a way to have the collision_delete_ranges already appended into replacement_plan.ranges, maybe by passing it into plan_key_replacements?
Fix Duplicate Attribute Keys in
transform_attributesChanges Made
This PR resolves issue #1650 by ensuring that dictionary keys are deduplicated when transformations such as
renameare applied, as required by the OpenTelemetry specification ("Exported maps MUST contain only unique keys by default").To accomplish this while maintaining strict performance requirements, we replaced the previous
RowConverterdeduplication strategy with a new high-performance, proactive pre-filter:filter_rename_collisionsintotransform_attributes_implinsideotap-dataflow/crates/pdata/src/otap/transform.rs.parent_ids and target keys. It uses theIdBitmaptype to find any existing target keys whoseparent_idmaps back to an old key that will be renamed.arrow::compute::filter_record_batchbefore the actual transform happens.Testing
AttributesProcessorunit tests (test_rename_removes_duplicate_keys) to explicitly verify that renaming an attribute resulting in a collision automatically discards duplicate keys.AttributesTransformPipelineStageinquery-enginetests with a parallel case ensuring OPL/KQL query pipelines (project-rename) properly drop duplicates when resolving duplicates.otap_df_pdatatransform.rstests to properly expect deduplicated keys using this plan-based method.cargo test --workspace --all-features.Validation Results
All tests pass. OTel semantic rules surrounding unique mapped keys map cleanly through down/upstream processors. The
IdBitmapintersection approach completely resolves the multi-thousand percentRowConverterperformance regressions, dropping collision resolution overhead to essentially zero through efficient bitmap operations.