From 71f2ee68e5571307af3789ca09bcb3e7af90da95 Mon Sep 17 00:00:00 2001 From: Gyan Ranjan Panda Date: Sat, 28 Mar 2026 01:10:00 +0530 Subject: [PATCH] Fix duplicate attribute keys in transform_attributes (issue #1650) When renaming attribute key 'x' to 'y', any existing row with key 'y' sharing a parent_id with a row having key 'x' would produce a duplicate. This commit fixes that by: - Adding find_rename_collisions_to_delete_ranges() which uses IdBitmap to efficiently detect these collisions in O(N) time - Generating KeyTransformRange::Delete entries that are merged into the existing transform pipeline in transform_keys() and transform_dictionary_keys() - Fixing an early-return in transform_dictionary_keys() that skipped row-level collision deletes when dictionary values had no deletions - Adding read_parent_ids_as_u32() helper for parent_id column access - Adding test_rename_removes_duplicate_keys integration test Only runs collision detection when parent_ids are plain-encoded (not transport-optimized) to avoid incorrect results from quasi-delta encoded values. Closes #1650 --- .../processors/attributes_processor/mod.rs | 65 ++++++ .../crates/pdata/src/otap/transform.rs | 213 ++++++++++++++++-- 2 files changed, 265 insertions(+), 13 deletions(-) diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs index e48c3d43e3..ec9e0c4846 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/attributes_processor/mod.rs @@ -716,6 +716,71 @@ mod tests { }) .validate(|_| async move {}); } + #[test] + fn test_rename_removes_duplicate_keys() { + // Prepare input with key "a" and "b" + let input = build_logs_with_attrs( + vec![], + vec![], + vec![ + KeyValue::new("a", AnyValue::new_string("value_a")), + KeyValue::new("b", AnyValue::new_string("value_b")), + ], + ); + + let cfg = json!({ + "actions": [ + {"action": "rename", "source_key": "a", "destination_key": "b"} + ] + }); + + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + + let node = test_node("attributes-processor-test-dup"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN); + node_config.config = cfg; + let proc = + create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes; + + // Expect no "a" and exactly one "b" + assert!(!log_attrs.iter().any(|kv| kv.key == "a")); + let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count(); + assert_eq!( + b_count, 1, + "There should be exactly one key 'b' (no duplicates)" + ); + }) + .validate(|_| async move {}); + } #[test] fn test_delete_applies_to_signal_only_by_default() { diff --git a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs index aa9e925f0d..1fea324a51 100644 --- a/rust/otap-dataflow/crates/pdata/src/otap/transform.rs +++ b/rust/otap-dataflow/crates/pdata/src/otap/transform.rs @@ -1285,9 +1285,10 @@ pub fn transform_attributes_impl( // At the same time, set flag to check if the batch already has the transport optimized // encoding. This is used later to determine if we need to lazily materialize the because ID // column because the transformation would break some sequences of encoded IDs. - let insert_or_upsert_needed = (transform.insert.is_some() || has_upsert) + let has_renames = transform.rename.as_ref().is_some_and(|r| !r.map.is_empty()); + let early_materialize_needed = (transform.insert.is_some() || has_upsert) && schema.column_with_name(consts::PARENT_ID).is_some(); - let (attrs_record_batch_cow, is_transport_optimized) = if insert_or_upsert_needed { + let (attrs_record_batch_cow, is_transport_optimized) = if early_materialize_needed { let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?; (Cow::Owned(rb), false) } else { @@ -1306,6 +1307,37 @@ pub fn transform_attributes_impl( let attrs_record_batch = attrs_record_batch_cow.as_ref(); let schema = attrs_record_batch.schema(); + // Compute collision delete ranges if renames exist. + // When renaming key "a" -> "b", any existing row with key "b" sharing a parent_id + // with a row having key "a" would become a duplicate. We proactively identify these + // collisions and generate Delete ranges that are merged into the transform pipeline. + // Only when parent_ids are plain-encoded (not transport-optimized) — quasi-delta + // encoded values don't represent actual parent IDs. + let collision_delete_ranges = if has_renames && !is_transport_optimized { + let rename = transform + .rename + .as_ref() + .expect("has_renames guard ensures this is Some"); + let parent_ids_vec = attrs_record_batch + .column_by_name(consts::PARENT_ID) + .map(read_parent_ids_as_u32) + .transpose()?; + if let Some(parent_ids) = parent_ids_vec.as_ref() { + let key_col = get_required_array(attrs_record_batch, consts::ATTRIBUTE_KEY)?; + let key_accessor = StringArrayAccessor::try_new(key_col)?; + find_rename_collisions_to_delete_ranges( + attrs_record_batch.num_rows(), + &key_accessor, + parent_ids, + rename, + ) + } else { + Vec::new() + } + } else { + Vec::new() + }; + let (mut rb, mut stats) = match key_column.data_type() { DataType::Utf8 => { let keys_arr = key_column @@ -1313,7 +1345,8 @@ pub fn transform_attributes_impl( .downcast_ref() .expect("can downcast Utf8 Column to string array"); - let keys_transform_result = transform_keys(keys_arr, transform)?; + let keys_transform_result = + transform_keys(keys_arr, transform, &collision_delete_ranges)?; let stats = TransformStats { renamed_entries: keys_transform_result.replaced_rows as u64, deleted_entries: keys_transform_result.deleted_rows as u64, @@ -1380,6 +1413,7 @@ pub fn transform_attributes_impl( .downcast_ref::>() .expect("can downcast dictionary column to dictionary array"), transform, + &collision_delete_ranges, is_transport_optimized, compute_stats, )?; @@ -1404,6 +1438,7 @@ pub fn transform_attributes_impl( .downcast_ref::>() .expect("can downcast dictionary column to dictionary array"), transform, + &collision_delete_ranges, is_transport_optimized, compute_stats, )?; @@ -1571,6 +1606,7 @@ struct KeysTransformResult { fn transform_keys( array: &StringArray, transform: &AttributesTransform, + collision_delete_ranges: &[KeyTransformRange], ) -> Result { let len = array.len(); let values = array.values(); @@ -1617,7 +1653,15 @@ fn transform_keys( // we're going to pass over both the values and the offsets, taking any ranges that weren't // that are unmodified, while either transforming or omitting ranges that were either replaced // or deleted. To get the sorted list of how to handle each range, we merge the plans' ranges - let transform_ranges = merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()); + let mut transform_ranges = + merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()).into_owned(); + + // Merge collision delete ranges into the transform plan + if !collision_delete_ranges.is_empty() { + transform_ranges.extend_from_slice(collision_delete_ranges); + transform_ranges.sort_by_key(|r| r.start()); + } + let transform_ranges: Cow<'_, [KeyTransformRange]> = Cow::Owned(transform_ranges); // create buffer to contain the new values let mut new_values = MutableBuffer::with_capacity(calculate_new_keys_buffer_len( @@ -1727,11 +1771,21 @@ fn transform_keys( KeyTransformRangeType::Delete => { // for deleted ranges we don't need to append any offsets to the buffer, so we // just decrement by how many total bytes were deleted from this range. - let deleted_val_len = delete_plan - .as_ref() - .expect("delete plan should be initialized") - .target_keys[transform_range.idx] - .len(); + let deleted_val_len = if let Some(ref dp) = delete_plan { + if transform_range.idx < dp.target_keys.len() { + dp.target_keys[transform_range.idx].len() + } else { + // collision-delete range: compute byte length from offsets + let s = offsets[transform_range.start()] as usize; + let e = offsets[transform_range.end()] as usize; + (e - s) / transform_range.range.len() + } + } else { + // collision-delete range without a delete plan + let s = offsets[transform_range.start()] as usize; + let e = offsets[transform_range.end()] as usize; + (e - s) / transform_range.range.len() + }; curr_total_offset_adjustment -= (deleted_val_len * transform_range.range.len()) as i32; } @@ -1820,6 +1874,127 @@ struct DictionaryKeysTransformResult { renamed_rows: usize, } +/// Identify rows that would become duplicates after a rename and return them as +/// `KeyTransformRange::Delete` entries. When renaming key `x` to `y`, any existing +/// row with key `y` whose `parent_id` also has a row with key `x` would become a +/// duplicate. Uses [`IdBitmap`] for efficient parent-id set intersection. +/// +/// Returns a list of `KeyTransformRange` with `range_type = Delete` that can +/// be merged into the existing transformation plans, removing the row cleanly. +fn find_rename_collisions_to_delete_ranges( + num_rows: usize, + key_accessor: &StringArrayAccessor<'_>, + parent_ids: &[u32], + rename: &RenameTransform, +) -> Vec { + let mut collision_delete_rows = vec![false; num_rows]; + let mut has_collisions = false; + + for (old_key, new_key) in &rename.map { + // Build IdBitmap of parent_ids that have the source key (old_key). + let mut source_parents = IdBitmap::new(); + for (i, &pid) in parent_ids.iter().enumerate() { + if let Some(k) = key_accessor.str_at(i) { + if k == old_key.as_str() { + source_parents.insert(pid); + } + } + } + + if source_parents.is_empty() { + continue; + } + + // Mark target-key rows whose parent_id overlaps with source_parents. + for (i, &pid) in parent_ids.iter().enumerate() { + if let Some(k) = key_accessor.str_at(i) { + if k == new_key.as_str() && source_parents.contains(pid) { + collision_delete_rows[i] = true; + has_collisions = true; + } + } + } + } + + if !has_collisions { + return Vec::new(); + } + + // Convert per-row booleans into contiguous KeyTransformRange::Delete entries + let mut ranges = Vec::new(); + let mut start = None; + for (i, &should_delete) in collision_delete_rows.iter().enumerate() { + if should_delete { + if start.is_none() { + start = Some(i); + } + } else if let Some(s) = start.take() { + ranges.push(KeyTransformRange { + range: s..i, + idx: 0, + range_type: KeyTransformRangeType::Delete, + }); + } + } + if let Some(s) = start { + ranges.push(KeyTransformRange { + range: s..num_rows, + idx: 0, + range_type: KeyTransformRangeType::Delete, + }); + } + ranges +} + +/// Read the parent_id column into a flat `Vec`, handling UInt16, UInt32, +/// and dictionary-encoded variants. +fn read_parent_ids_as_u32(col: &ArrayRef) -> Result> { + let n = col.len(); + let mut result = Vec::with_capacity(n); + match col.data_type() { + DataType::UInt16 => { + let arr = col.as_any().downcast_ref::().ok_or_else(|| { + Error::UnexpectedRecordBatchState { + reason: "expected UInt16 for parent_id".into(), + } + })?; + for i in 0..n { + result.push(arr.value(i) as u32); + } + } + DataType::UInt32 => { + let arr = col.as_any().downcast_ref::().ok_or_else(|| { + Error::UnexpectedRecordBatchState { + reason: "expected UInt32 for parent_id".into(), + } + })?; + for i in 0..n { + result.push(arr.value(i)); + } + } + DataType::Dictionary(key_type, _) => { + let arr = + cast(col, &DataType::UInt32).map_err(|e| Error::UnexpectedRecordBatchState { + reason: format!("cannot cast dict({key_type:?}) parent_id to u32: {e}"), + })?; + let u32_arr = arr.as_any().downcast_ref::().ok_or_else(|| { + Error::UnexpectedRecordBatchState { + reason: "cast to UInt32 failed".into(), + } + })?; + for i in 0..n { + result.push(u32_arr.value(i)); + } + } + other => { + return Err(Error::UnexpectedRecordBatchState { + reason: format!("unsupported parent_id data type: {other:?}"), + }); + } + } + Ok(result) +} + /// Transforms the keys for the dictionary array. /// /// # Arguments @@ -1835,6 +2010,7 @@ struct DictionaryKeysTransformResult { fn transform_dictionary_keys( dict_arr: &DictionaryArray, transform: &AttributesTransform, + collision_delete_ranges: &[KeyTransformRange], is_transport_encoded: bool, compute_stats: bool, ) -> Result> @@ -1849,7 +2025,7 @@ where actual: dict_values.data_type().clone(), } })?; - let dict_values_transform_result = transform_keys(dict_values, transform)?; + let dict_values_transform_result = transform_keys(dict_values, transform, &[])?; // Convert the ranges of transformed dictionary values into the ranges of transformed dict keys. // These ranges are used to determine two things: @@ -1872,6 +2048,13 @@ where Vec::new() }; + // Merge collision delete ranges (row-level) into the dict key transform ranges + let mut dict_key_transform_ranges = dict_key_transform_ranges; + if !collision_delete_ranges.is_empty() { + dict_key_transform_ranges.extend_from_slice(collision_delete_ranges); + dict_key_transform_ranges.sort_by_key(|r| r.start()); + } + // If we're tracking statistics on how many rows were transformed, it's less expensive to // compute these statistics from the ranges of transformed dictionary keys. However, if these // ranges haven't been computed, it's more performant to compute the statistics from the @@ -1889,7 +2072,7 @@ where (rename_count, 0) }; - if dict_values_transform_result.keep_ranges.is_none() { + if dict_values_transform_result.keep_ranges.is_none() && collision_delete_ranges.is_empty() { // here there were no rows deleted from the values array, which means we can reuse // the dictionary keys without any transformations let new_dict_keys = dict_keys.clone(); @@ -1912,8 +2095,12 @@ where }); } - // safety: we've checked above that this is not None - let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.expect("not none"); + let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.unwrap_or_else(|| { + vec![Range { + start: 0, + end: dict_values.len(), + }] + }); // create quick lookup for each dictionary key of whether it was kept and if so which // contiguous range of kept dictionary values the key points to. This will allow us to build