Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 55 additions & 58 deletions rust/otap-dataflow/crates/pdata/src/decode/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,19 @@ impl Consumer {
&mut self,
records: &mut BatchArrowRecords,
) -> Result<ExportMetricsServiceRequest> {
match get_main_payload_type(records)? {
ArrowPayloadType::UnivariateMetrics => {
let record_messages = self.consume_bar(records)?;
let mut otap_batch =
OtapArrowRecords::Metrics(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.metrics_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;
check_payload_type_present(records, ArrowPayloadType::UnivariateMetrics)?;

ExportMetricsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization: {e:?}"),
}
})
let record_messages = self.consume_bar(records)?;
let mut otap_batch = OtapArrowRecords::Metrics(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.metrics_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;

ExportMetricsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization: {e:?}"),
}
main_record_type => Err(Error::UnsupportedPayloadType {
actual: main_record_type.into(),
}),
}
})
}

/// Consumes all the arrow payloads in the passed OTAP `BatchArrayRecords` and decodes them
Expand All @@ -140,64 +134,67 @@ impl Consumer {
&mut self,
records: &mut BatchArrowRecords,
) -> Result<ExportLogsServiceRequest> {
match get_main_payload_type(records)? {
ArrowPayloadType::Logs => {
let record_messages = self.consume_bar(records)?;
let mut otap_batch = OtapArrowRecords::Logs(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.logs_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;
check_payload_type_present(records, ArrowPayloadType::Logs)?;
Copy link
Copy Markdown
Contributor

@JakeDern JakeDern Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a separate ahead of time check for the root payload time. Most things are checked by from_record_messages which will fail if any of the records are invalid for the signal or have a misaligned schema.

It currently does not fail if there's no root record batch for the signal (perhaps something we should consider adding), so I think we just need to check that after we create the otap batch that otap_batch.get(ArrowPayloadType::Logs).is_some()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Similar feedback goes for the other signals as well)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, at this point. @JakeDern are you suggesting that we modify from_record_messages() to check for the root payload itself?


let record_messages = self.consume_bar(records)?;
let mut otap_batch = OtapArrowRecords::Logs(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.logs_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;

ExportLogsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization: {e:?}"),
}
})
ExportLogsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization: {e:?}"),
}
main_record_type => Err(Error::UnsupportedPayloadType {
actual: main_record_type.into(),
}),
}
})
}

/// Consumes record batches in [BatchArrowRecords] to [ExportTraceServiceRequest].
pub fn consume_traces_batches(
&mut self,
records: &mut BatchArrowRecords,
) -> Result<ExportTraceServiceRequest> {
match get_main_payload_type(records)? {
ArrowPayloadType::Spans => {
let record_messages = self.consume_bar(records)?;
let mut otap_batch =
OtapArrowRecords::Traces(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.traces_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;
check_payload_type_present(records, ArrowPayloadType::Spans)?;

let record_messages = self.consume_bar(records)?;
let mut otap_batch = OtapArrowRecords::Traces(from_record_messages(record_messages)?);
self.proto_buffer.clear();
self.traces_proto_encoder
.encode(&mut otap_batch, &mut self.proto_buffer)?;

ExportTraceServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization {e:?}"),
}
})
ExportTraceServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| {
Error::UnexpectedRecordBatchState {
reason: format!("error decoding proto serialization {e:?}"),
}
main_record_type => Err(Error::UnsupportedPayloadType {
actual: main_record_type.into(),
}),
}
})
}
}

/// Get the main logs, metrics, or traces from a received BatchArrowRecords message.
fn get_main_payload_type(records: &BatchArrowRecords) -> Result<ArrowPayloadType> {
/// Check that the expected root payload type exists somewhere in the batch.
///
/// The root payload type (Logs, UnivariateMetrics, Spans) may appear at any
/// position in the arrow payloads, not necessarily at position 0.
fn check_payload_type_present(
records: &BatchArrowRecords,
expected: ArrowPayloadType,
) -> Result<()> {
if records.arrow_payloads.is_empty() {
return Err(Error::EmptyBatch);
}

// Per the specification, the main record type is the first payload
let main_record_type = records.arrow_payloads[0].r#type;
ArrowPayloadType::try_from(main_record_type).map_err(|_| Error::UnsupportedPayloadType {
actual: main_record_type,
})
let found = records.arrow_payloads.iter().any(|p| {
ArrowPayloadType::try_from(p.r#type)
.map(|t| t == expected)
.unwrap_or(false)
});

if found {
Ok(())
} else {
Err(Error::RecordBatchNotFound {
payload_type: expected,
})
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this can be simplified a bit to be more idiomatic Rust:

    if records.arrow_payloads.iter().any(|payload| {
        ArrowPayloadType::try_from(payload.r#type) == Ok(expected)
    }) {
        Ok(())
    } else {
        Err(Error::RecordBatchNotFound {
            payload_type: expected,
        })
    }

}

#[cfg(test)]
Expand Down Expand Up @@ -320,4 +317,4 @@ mod tests {
*reader.get_mut() = Cursor::new(std::mem::take(writer.get_mut()));
assert_eq!(batch2, reader.next().unwrap().unwrap());
}
}
}
Loading