Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,71 @@ mod tests {
})
.validate(|_| async move {});
}
#[test]
fn test_rename_removes_duplicate_keys() {
// Prepare input with key "a" and "b"
let input = build_logs_with_attrs(
vec![],
vec![],
vec![
KeyValue::new("a", AnyValue::new_string("value_a")),
KeyValue::new("b", AnyValue::new_string("value_b")),
],
);

let cfg = json!({
"actions": [
{"action": "rename", "source_key": "a", "destination_key": "b"}
]
});

let telemetry_registry_handle = TelemetryRegistryHandle::new();
let controller_ctx = ControllerContext::new(telemetry_registry_handle);
let pipeline_ctx =
controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0);

let node = test_node("attributes-processor-test-dup");
let rt: TestRuntime<OtapPdata> = TestRuntime::new();
let mut node_config = NodeUserConfig::new_processor_config(ATTRIBUTES_PROCESSOR_URN);
node_config.config = cfg;
let proc =
create_attributes_processor(pipeline_ctx, node, Arc::new(node_config), rt.config())
.expect("create processor");
let phase = rt.set_processor(proc);

phase
.run_test(|mut ctx| async move {
let mut bytes = BytesMut::new();
input.encode(&mut bytes).expect("encode");
let bytes = bytes.freeze();
let pdata_in =
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
ctx.process(Message::PData(pdata_in))
.await
.expect("process");

let out = ctx.drain_pdata().await;
let first = out.into_iter().next().expect("one output").payload();

let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp");
let bytes = match otlp_bytes {
OtlpProtoBytes::ExportLogsRequest(b) => b,
_ => panic!("unexpected otlp variant"),
};
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");

let log_attrs = &decoded.resource_logs[0].scope_logs[0].log_records[0].attributes;

// Expect no "a" and exactly one "b"
assert!(!log_attrs.iter().any(|kv| kv.key == "a"));
let b_count = log_attrs.iter().filter(|kv| kv.key == "b").count();
assert_eq!(
b_count, 1,
"There should be exactly one key 'b' (no duplicates)"
);
})
.validate(|_| async move {});
}

#[test]
fn test_delete_applies_to_signal_only_by_default() {
Expand Down
213 changes: 200 additions & 13 deletions rust/otap-dataflow/crates/pdata/src/otap/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,9 +1285,10 @@ pub fn transform_attributes_impl(
// At the same time, set flag to check if the batch already has the transport optimized
// encoding. This is used later to determine if we need to lazily materialize the because ID
// column because the transformation would break some sequences of encoded IDs.
let insert_or_upsert_needed = (transform.insert.is_some() || has_upsert)
let has_renames = transform.rename.as_ref().is_some_and(|r| !r.map.is_empty());
let early_materialize_needed = (transform.insert.is_some() || has_upsert)
&& schema.column_with_name(consts::PARENT_ID).is_some();
let (attrs_record_batch_cow, is_transport_optimized) = if insert_or_upsert_needed {
let (attrs_record_batch_cow, is_transport_optimized) = if early_materialize_needed {
let rb = materialize_parent_id_for_attributes_auto(attrs_record_batch)?;
(Cow::Owned(rb), false)
} else {
Expand All @@ -1306,14 +1307,46 @@ pub fn transform_attributes_impl(
let attrs_record_batch = attrs_record_batch_cow.as_ref();
let schema = attrs_record_batch.schema();

// Compute collision delete ranges if renames exist.
// When renaming key "a" -> "b", any existing row with key "b" sharing a parent_id
// with a row having key "a" would become a duplicate. We proactively identify these
// collisions and generate Delete ranges that are merged into the transform pipeline.
// Only when parent_ids are plain-encoded (not transport-optimized) — quasi-delta
// encoded values don't represent actual parent IDs.
Comment on lines +1314 to +1315
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true. The delta encoded IDs do represent actual parent_ids, and we can't just ignore this logic if we find delta encoded parent IDs.

Probably the most optimal thing to do is actually check if any rows have an attribute with the new key name. If this happens, we'll need to remove the delta delta encoding so we can figure out if a row with the same parent_id also has the old key name, which would indicate a collision after rename.

I realize this complicates your algorithm, because clearly both those key checks are happening inside the same helper find_rename_collisions_to_delete_ranges, so in order to get this to work as explained you may need to refactor a bit.

let collision_delete_ranges = if has_renames && !is_transport_optimized {
let rename = transform
.rename
.as_ref()
.expect("has_renames guard ensures this is Some");
let parent_ids_vec = attrs_record_batch
.column_by_name(consts::PARENT_ID)
.map(read_parent_ids_as_u32)
.transpose()?;
if let Some(parent_ids) = parent_ids_vec.as_ref() {
Comment on lines +1321 to +1325
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parent_id column should always be present for attributes. If it's not present that would be an error (similar to how you've returned an error below for the attributes key). See the spec has this column as required: https://github.com/open-telemetry/otel-arrow/blob/main/docs/otap-spec.md#542-u16-attributes

Also, we are copying the ID column into a Vec<u32> here, only to pass it into find_rename_collisions_to_delete ranges simply so the IDs can be iterated. This kind of copying is wasteful

IMO we could probably just make find_rename_collisions_to_delete generic over <K: ArrowPrimitiveType> and have it accept parent_ids as a &PrimitiveArray<K>. If the parent_id column is a PrimitiveArray, we pass that directly. If it's a dictionary, we can actually just pass the keys and the logic we're using to look for which rows would be duplicated after rename should still work.

let key_col = get_required_array(attrs_record_batch, consts::ATTRIBUTE_KEY)?;
let key_accessor = StringArrayAccessor::try_new(key_col)?;
find_rename_collisions_to_delete_ranges(
attrs_record_batch.num_rows(),
&key_accessor,
parent_ids,
rename,
)
} else {
Vec::new()
}
} else {
Vec::new()
};

let (mut rb, mut stats) = match key_column.data_type() {
DataType::Utf8 => {
let keys_arr = key_column
.as_any()
.downcast_ref()
.expect("can downcast Utf8 Column to string array");

let keys_transform_result = transform_keys(keys_arr, transform)?;
let keys_transform_result =
transform_keys(keys_arr, transform, &collision_delete_ranges)?;
let stats = TransformStats {
renamed_entries: keys_transform_result.replaced_rows as u64,
deleted_entries: keys_transform_result.deleted_rows as u64,
Expand Down Expand Up @@ -1380,6 +1413,7 @@ pub fn transform_attributes_impl(
.downcast_ref::<DictionaryArray<UInt8Type>>()
.expect("can downcast dictionary column to dictionary array"),
transform,
&collision_delete_ranges,
is_transport_optimized,
compute_stats,
)?;
Expand All @@ -1404,6 +1438,7 @@ pub fn transform_attributes_impl(
.downcast_ref::<DictionaryArray<UInt16Type>>()
.expect("can downcast dictionary column to dictionary array"),
transform,
&collision_delete_ranges,
is_transport_optimized,
compute_stats,
)?;
Expand Down Expand Up @@ -1571,6 +1606,7 @@ struct KeysTransformResult {
fn transform_keys(
array: &StringArray,
transform: &AttributesTransform,
collision_delete_ranges: &[KeyTransformRange],
) -> Result<KeysTransformResult> {
let len = array.len();
let values = array.values();
Expand Down Expand Up @@ -1617,7 +1653,15 @@ fn transform_keys(
// we're going to pass over both the values and the offsets, taking any ranges that weren't
// that are unmodified, while either transforming or omitting ranges that were either replaced
// or deleted. To get the sorted list of how to handle each range, we merge the plans' ranges
let transform_ranges = merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref());
let mut transform_ranges =
merge_transform_ranges(replacement_plan.as_ref(), delete_plan.as_ref()).into_owned();

// Merge collision delete ranges into the transform plan
if !collision_delete_ranges.is_empty() {
transform_ranges.extend_from_slice(collision_delete_ranges);
transform_ranges.sort_by_key(|r| r.start());
}
let transform_ranges: Cow<'_, [KeyTransformRange]> = Cow::Owned(transform_ranges);
Comment on lines +1656 to +1664
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel that we should convert this to a Vec, then re-sort it, then convert it back to a Cow. The whole point of it being a Cow in the first place was to avoid a heap allocation.

Maybe we could find a way to have the collision_delete_ranges already appended into replacement_plan.ranges, maybe by passing it into plan_key_replacements?


// create buffer to contain the new values
let mut new_values = MutableBuffer::with_capacity(calculate_new_keys_buffer_len(
Expand Down Expand Up @@ -1727,11 +1771,21 @@ fn transform_keys(
KeyTransformRangeType::Delete => {
// for deleted ranges we don't need to append any offsets to the buffer, so we
// just decrement by how many total bytes were deleted from this range.
let deleted_val_len = delete_plan
.as_ref()
.expect("delete plan should be initialized")
.target_keys[transform_range.idx]
.len();
let deleted_val_len = if let Some(ref dp) = delete_plan {
if transform_range.idx < dp.target_keys.len() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're setting idx to 0 here:

} else if let Some(s) = start.take() {
ranges.push(KeyTransformRange {
range: s..i,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});
}
}
if let Some(s) = start {
ranges.push(KeyTransformRange {
range: s..num_rows,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});

So won't transform_range.idx always be less than dp.target_keys.len() if there are any deletes?

We should probably have a test that does delete and rename at the same time to cover this kind of issue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the solution here would be to make idx an Option

dp.target_keys[transform_range.idx].len()
} else {
// collision-delete range: compute byte length from offsets
let s = offsets[transform_range.start()] as usize;
let e = offsets[transform_range.end()] as usize;
(e - s) / transform_range.range.len()
}
} else {
// collision-delete range without a delete plan
let s = offsets[transform_range.start()] as usize;
let e = offsets[transform_range.end()] as usize;
(e - s) / transform_range.range.len()
};
curr_total_offset_adjustment -=
(deleted_val_len * transform_range.range.len()) as i32;
}
Expand Down Expand Up @@ -1820,6 +1874,127 @@ struct DictionaryKeysTransformResult<K: ArrowDictionaryKeyType> {
renamed_rows: usize,
}

/// Identify rows that would become duplicates after a rename and return them as
/// `KeyTransformRange::Delete` entries. When renaming key `x` to `y`, any existing
/// row with key `y` whose `parent_id` also has a row with key `x` would become a
/// duplicate. Uses [`IdBitmap`] for efficient parent-id set intersection.
///
/// Returns a list of `KeyTransformRange` with `range_type = Delete` that can
/// be merged into the existing transformation plans, removing the row cleanly.
fn find_rename_collisions_to_delete_ranges(
num_rows: usize,
key_accessor: &StringArrayAccessor<'_>,
parent_ids: &[u32],
rename: &RenameTransform,
) -> Vec<KeyTransformRange> {
let mut collision_delete_rows = vec![false; num_rows];
let mut has_collisions = false;

for (old_key, new_key) in &rename.map {
// Build IdBitmap of parent_ids that have the source key (old_key).
let mut source_parents = IdBitmap::new();
Comment on lines +1893 to +1895
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heap allocation of ID bitmap can be reused for each rename:

Suggested change
for (old_key, new_key) in &rename.map {
// Build IdBitmap of parent_ids that have the source key (old_key).
let mut source_parents = IdBitmap::new();
let mut source_parents = IdBitmap::new();
for (old_key, new_key) in &rename.map {
source_parents.clear();

for (i, &pid) in parent_ids.iter().enumerate() {
if let Some(k) = key_accessor.str_at(i) {
if k == old_key.as_str() {
source_parents.insert(pid);
}
}
Comment on lines +1897 to +1901
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way we're checking the strings here could be optimized. Since attribute keys will most often be dictionary encoded, what will happen here is that this will effectively pull the string from the dict and compare it for every parent ID. It would actually be more optimal to do the string comparison one time on the dictionary values.

All this to say that we should probably be using the arrow eq kernel on the string up front to create a mask of which rows contain this key https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html and check if the mask is true at this index.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example if you passed keys directly as an ArrayRef, you could so something similar to this:

// compare using optimized kernel
let keys = eq(keys, &StringArray::new_scalar(old_key))?;

// for each range that has equal keys
for (start, end) in BitSliceIterator::new(&keys.values().inner(), 0, keys.len()) {
    // push IDs into source_parents
    for pid in parent_ids[start..end] {
        source_parents.insert(pid)
    }
  }

I say "something similar" because I also think we may want to pass something other than a Vec for parent IDs. (see my other comment)

}

if source_parents.is_empty() {
continue;
}

// Mark target-key rows whose parent_id overlaps with source_parents.
for (i, &pid) in parent_ids.iter().enumerate() {
if let Some(k) = key_accessor.str_at(i) {
if k == new_key.as_str() && source_parents.contains(pid) {
Comment on lines +1910 to +1911
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to the one above related to using eq compute kernel to compare the key value instead of calling str_at and == for every row.

collision_delete_rows[i] = true;
has_collisions = true;
}
}
}
}

if !has_collisions {
return Vec::new();
}

// Convert per-row booleans into contiguous KeyTransformRange::Delete entries
let mut ranges = Vec::new();
let mut start = None;
for (i, &should_delete) in collision_delete_rows.iter().enumerate() {
Comment on lines +1923 to +1926
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're materializing a vector with a flag at each position indicating that the row is deleted, only to throw it away when we iterate it to convert to ranges here. Wouldn't it be more optimal to just compute the vec of ranges directly in the code above?

if should_delete {
if start.is_none() {
start = Some(i);
}
} else if let Some(s) = start.take() {
ranges.push(KeyTransformRange {
range: s..i,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});
}
}
if let Some(s) = start {
ranges.push(KeyTransformRange {
range: s..num_rows,
idx: 0,
range_type: KeyTransformRangeType::Delete,
});
}
ranges
}

/// Read the parent_id column into a flat `Vec<u32>`, handling UInt16, UInt32,
/// and dictionary-encoded variants.
fn read_parent_ids_as_u32(col: &ArrayRef) -> Result<Vec<u32>> {
let n = col.len();
let mut result = Vec::with_capacity(n);
match col.data_type() {
DataType::UInt16 => {
let arr = col.as_any().downcast_ref::<UInt16Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "expected UInt16 for parent_id".into(),
}
})?;
for i in 0..n {
result.push(arr.value(i) as u32);
}
}
DataType::UInt32 => {
let arr = col.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "expected UInt32 for parent_id".into(),
}
})?;
for i in 0..n {
result.push(arr.value(i));
}
}
DataType::Dictionary(key_type, _) => {
let arr =
cast(col, &DataType::UInt32).map_err(|e| Error::UnexpectedRecordBatchState {
reason: format!("cannot cast dict({key_type:?}) parent_id to u32: {e}"),
})?;
let u32_arr = arr.as_any().downcast_ref::<UInt32Array>().ok_or_else(|| {
Error::UnexpectedRecordBatchState {
reason: "cast to UInt32 failed".into(),
}
})?;
for i in 0..n {
result.push(u32_arr.value(i));
}
}
other => {
return Err(Error::UnexpectedRecordBatchState {
reason: format!("unsupported parent_id data type: {other:?}"),
});
}
}
Ok(result)
}

/// Transforms the keys for the dictionary array.
///
/// # Arguments
Expand All @@ -1835,6 +2010,7 @@ struct DictionaryKeysTransformResult<K: ArrowDictionaryKeyType> {
fn transform_dictionary_keys<K>(
dict_arr: &DictionaryArray<K>,
transform: &AttributesTransform,
collision_delete_ranges: &[KeyTransformRange],
is_transport_encoded: bool,
compute_stats: bool,
) -> Result<DictionaryKeysTransformResult<K>>
Expand All @@ -1849,7 +2025,7 @@ where
actual: dict_values.data_type().clone(),
}
})?;
let dict_values_transform_result = transform_keys(dict_values, transform)?;
let dict_values_transform_result = transform_keys(dict_values, transform, &[])?;

// Convert the ranges of transformed dictionary values into the ranges of transformed dict keys.
// These ranges are used to determine two things:
Expand All @@ -1872,6 +2048,13 @@ where
Vec::new()
};

// Merge collision delete ranges (row-level) into the dict key transform ranges
let mut dict_key_transform_ranges = dict_key_transform_ranges;
if !collision_delete_ranges.is_empty() {
dict_key_transform_ranges.extend_from_slice(collision_delete_ranges);
dict_key_transform_ranges.sort_by_key(|r| r.start());
}

// If we're tracking statistics on how many rows were transformed, it's less expensive to
// compute these statistics from the ranges of transformed dictionary keys. However, if these
// ranges haven't been computed, it's more performant to compute the statistics from the
Expand All @@ -1889,7 +2072,7 @@ where
(rename_count, 0)
};

if dict_values_transform_result.keep_ranges.is_none() {
if dict_values_transform_result.keep_ranges.is_none() && collision_delete_ranges.is_empty() {
// here there were no rows deleted from the values array, which means we can reuse
// the dictionary keys without any transformations
let new_dict_keys = dict_keys.clone();
Expand All @@ -1912,8 +2095,12 @@ where
});
}

// safety: we've checked above that this is not None
let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.expect("not none");
let dict_values_keep_ranges = dict_values_transform_result.keep_ranges.unwrap_or_else(|| {
vec![Range {
start: 0,
end: dict_values.len(),
}]
});

// create quick lookup for each dictionary key of whether it was kept and if so which
// contiguous range of kept dictionary values the key points to. This will allow us to build
Expand Down
Loading