From d51322f9e4b084df9910b69b4e1903725af685b4 Mon Sep 17 00:00:00 2001 From: Siddhartha Mathiharan Date: Mon, 30 Mar 2026 23:27:51 -0700 Subject: [PATCH] Rust decoder shouldn't assumes root payload is at position 0 --- .../crates/pdata/src/decode/decoder.rs | 113 +++++++++--------- 1 file changed, 55 insertions(+), 58 deletions(-) diff --git a/rust/otap-dataflow/crates/pdata/src/decode/decoder.rs b/rust/otap-dataflow/crates/pdata/src/decode/decoder.rs index 06d697cf36..d6a8508ed9 100644 --- a/rust/otap-dataflow/crates/pdata/src/decode/decoder.rs +++ b/rust/otap-dataflow/crates/pdata/src/decode/decoder.rs @@ -112,25 +112,19 @@ impl Consumer { &mut self, records: &mut BatchArrowRecords, ) -> Result { - match get_main_payload_type(records)? { - ArrowPayloadType::UnivariateMetrics => { - let record_messages = self.consume_bar(records)?; - let mut otap_batch = - OtapArrowRecords::Metrics(from_record_messages(record_messages)?); - self.proto_buffer.clear(); - self.metrics_proto_encoder - .encode(&mut otap_batch, &mut self.proto_buffer)?; + check_payload_type_present(records, ArrowPayloadType::UnivariateMetrics)?; - ExportMetricsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { - Error::UnexpectedRecordBatchState { - reason: format!("error decoding proto serialization: {e:?}"), - } - }) + let record_messages = self.consume_bar(records)?; + let mut otap_batch = OtapArrowRecords::Metrics(from_record_messages(record_messages)?); + self.proto_buffer.clear(); + self.metrics_proto_encoder + .encode(&mut otap_batch, &mut self.proto_buffer)?; + + ExportMetricsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("error decoding proto serialization: {e:?}"), } - main_record_type => Err(Error::UnsupportedPayloadType { - actual: main_record_type.into(), - }), - } + }) } /// Consumes all the arrow payloads in the passed OTAP `BatchArrayRecords` and decodes them @@ -140,24 +134,19 @@ impl Consumer { &mut self, records: &mut BatchArrowRecords, ) -> Result { - match get_main_payload_type(records)? { - ArrowPayloadType::Logs => { - let record_messages = self.consume_bar(records)?; - let mut otap_batch = OtapArrowRecords::Logs(from_record_messages(record_messages)?); - self.proto_buffer.clear(); - self.logs_proto_encoder - .encode(&mut otap_batch, &mut self.proto_buffer)?; + check_payload_type_present(records, ArrowPayloadType::Logs)?; + + let record_messages = self.consume_bar(records)?; + let mut otap_batch = OtapArrowRecords::Logs(from_record_messages(record_messages)?); + self.proto_buffer.clear(); + self.logs_proto_encoder + .encode(&mut otap_batch, &mut self.proto_buffer)?; - ExportLogsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { - Error::UnexpectedRecordBatchState { - reason: format!("error decoding proto serialization: {e:?}"), - } - }) + ExportLogsServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("error decoding proto serialization: {e:?}"), } - main_record_type => Err(Error::UnsupportedPayloadType { - actual: main_record_type.into(), - }), - } + }) } /// Consumes record batches in [BatchArrowRecords] to [ExportTraceServiceRequest]. @@ -165,39 +154,47 @@ impl Consumer { &mut self, records: &mut BatchArrowRecords, ) -> Result { - match get_main_payload_type(records)? { - ArrowPayloadType::Spans => { - let record_messages = self.consume_bar(records)?; - let mut otap_batch = - OtapArrowRecords::Traces(from_record_messages(record_messages)?); - self.proto_buffer.clear(); - self.traces_proto_encoder - .encode(&mut otap_batch, &mut self.proto_buffer)?; + check_payload_type_present(records, ArrowPayloadType::Spans)?; + + let record_messages = self.consume_bar(records)?; + let mut otap_batch = OtapArrowRecords::Traces(from_record_messages(record_messages)?); + self.proto_buffer.clear(); + self.traces_proto_encoder + .encode(&mut otap_batch, &mut self.proto_buffer)?; - ExportTraceServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { - Error::UnexpectedRecordBatchState { - reason: format!("error decoding proto serialization {e:?}"), - } - }) + ExportTraceServiceRequest::decode(self.proto_buffer.as_ref()).map_err(|e| { + Error::UnexpectedRecordBatchState { + reason: format!("error decoding proto serialization {e:?}"), } - main_record_type => Err(Error::UnsupportedPayloadType { - actual: main_record_type.into(), - }), - } + }) } } -/// Get the main logs, metrics, or traces from a received BatchArrowRecords message. -fn get_main_payload_type(records: &BatchArrowRecords) -> Result { +/// Check that the expected root payload type exists somewhere in the batch. +/// +/// The root payload type (Logs, UnivariateMetrics, Spans) may appear at any +/// position in the arrow payloads, not necessarily at position 0. +fn check_payload_type_present( + records: &BatchArrowRecords, + expected: ArrowPayloadType, +) -> Result<()> { if records.arrow_payloads.is_empty() { return Err(Error::EmptyBatch); } - // Per the specification, the main record type is the first payload - let main_record_type = records.arrow_payloads[0].r#type; - ArrowPayloadType::try_from(main_record_type).map_err(|_| Error::UnsupportedPayloadType { - actual: main_record_type, - }) + let found = records.arrow_payloads.iter().any(|p| { + ArrowPayloadType::try_from(p.r#type) + .map(|t| t == expected) + .unwrap_or(false) + }); + + if found { + Ok(()) + } else { + Err(Error::RecordBatchNotFound { + payload_type: expected, + }) + } } #[cfg(test)] @@ -320,4 +317,4 @@ mod tests { *reader.get_mut() = Cursor::new(std::mem::take(writer.get_mut())); assert_eq!(batch2, reader.next().unwrap().unwrap()); } -} +} \ No newline at end of file