diff --git a/CHANGELOG.md b/CHANGELOG.md index 1790d1b5d94..0dfefe39583 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ **Features**: +- Use separate intern tables per Perfetto field and infer main thread from pid. ([#5659](https://github.com/getsentry/relay/pull/5659)) - Set `sentry.segment.id` and `sentry.segment.name` attributes on OTLP segment spans. ([#5748](https://github.com/getsentry/relay/pull/5748)) - Envelope buffer: Add option to disable flush-to-disk on shutdown. ([#5751](https://github.com/getsentry/relay/pull/5751)) - Allow configuring Objectstore client auth parameters. ([#5720](https://github.com/getsentry/relay/pull/5720)) diff --git a/Cargo.lock b/Cargo.lock index 8b79660dbab..5996d1c37be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4619,6 +4619,7 @@ dependencies = [ "hashbrown 0.15.4", "insta", "itertools 0.14.0", + "prost 0.14.3", "relay-base-schema", "relay-dynamic-config", "relay-event-schema", diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 65e499501b5..beebad10227 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -76,6 +76,14 @@ pub enum Feature { /// Serialized as `organizations:continuous-profiling`. #[serde(rename = "organizations:continuous-profiling")] ContinuousProfiling, + /// Enable Perfetto binary trace processing for continuous profiling. + /// + /// When enabled, compound profile chunk items with `content_type: "perfetto"` are + /// expanded from binary Perfetto format into the Sample v2 JSON format. + /// + /// Serialized as `organizations:continuous-profiling-perfetto`. + #[serde(rename = "organizations:continuous-profiling-perfetto")] + ContinuousProfilingPerfetto, /// Enable log ingestion for our log product (this is not internal logging). /// /// Serialized as `organizations:ourlogs-ingestion`. @@ -196,4 +204,18 @@ mod tests { r#"["organizations:session-replay"]"# ); } + + #[test] + fn test_continuous_profiling_perfetto_serde() { + // Verify the serialized name matches what Sentry's backend sends. + let serialized = serde_json::to_string(&Feature::ContinuousProfilingPerfetto).unwrap(); + assert_eq!( + serialized, + r#""organizations:continuous-profiling-perfetto""# + ); + + let deserialized: Feature = + serde_json::from_str(r#""organizations:continuous-profiling-perfetto""#).unwrap(); + assert_eq!(deserialized, Feature::ContinuousProfilingPerfetto); + } } diff --git a/relay-profiling/Cargo.toml b/relay-profiling/Cargo.toml index 7b36b1c279b..33e5f95d0fc 100644 --- a/relay-profiling/Cargo.toml +++ b/relay-profiling/Cargo.toml @@ -19,6 +19,7 @@ chrono = { workspace = true } data-encoding = { workspace = true } hashbrown = { workspace = true } itertools = { workspace = true } +prost = { workspace = true } relay-base-schema = { workspace = true } relay-dynamic-config = { workspace = true } relay-event-schema = { workspace = true } diff --git a/relay-profiling/protos/README.md b/relay-profiling/protos/README.md new file mode 100644 index 00000000000..02536946ca4 --- /dev/null +++ b/relay-profiling/protos/README.md @@ -0,0 +1,18 @@ +# Perfetto Proto Definitions + +`perfetto_trace.proto` contains a minimal subset of the +[Perfetto trace proto definitions](https://github.com/google/perfetto/tree/master/protos/perfetto/trace) +needed to decode profiling data. Field numbers match the upstream definitions. + +The generated Rust code is checked in at `../src/perfetto/proto.rs`. + +## Regenerating + +Prerequisites: +- `protoc`: https://github.com/protocolbuffers/protobuf/releases (or `brew install protobuf`) +- `protoc-gen-prost`: `cargo install protoc-gen-prost` + +Then run: +```sh +./relay-profiling/protos/generate.sh +``` diff --git a/relay-profiling/protos/generate.sh b/relay-profiling/protos/generate.sh new file mode 100755 index 00000000000..dbfce8e5f41 --- /dev/null +++ b/relay-profiling/protos/generate.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# +# Regenerates the checked-in Rust protobuf bindings for Perfetto trace types +# using protoc with the protoc-gen-prost plugin. +# +# Usage: +# ./relay-profiling/protos/generate.sh + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +PROTO_FILE="$SCRIPT_DIR/perfetto_trace.proto" +OUTPUT_FILE="$SCRIPT_DIR/../src/perfetto/proto.rs" + +if ! command -v protoc &>/dev/null; then + echo "error: protoc is not installed." >&2 + echo " Install it from https://github.com/protocolbuffers/protobuf/releases" >&2 + echo " or: brew install protobuf" >&2 + exit 1 +fi +echo "Using protoc: $(command -v protoc) ($(protoc --version))" + +if ! command -v protoc-gen-prost &>/dev/null; then + echo "error: protoc-gen-prost is not installed." >&2 + echo " Install it with: cargo install protoc-gen-prost" >&2 + exit 1 +fi +echo "Using protoc-gen-prost: $(command -v protoc-gen-prost)" + +if [[ ! -f "$PROTO_FILE" ]]; then + echo "error: proto file not found at $PROTO_FILE" >&2 + exit 1 +fi + +TMPDIR="$(mktemp -d)" +trap 'rm -rf "$TMPDIR"' EXIT + +echo "Generating Rust bindings..." +protoc \ + --prost_out="$TMPDIR" \ + --proto_path="$SCRIPT_DIR" \ + "$PROTO_FILE" + +# protoc-gen-prost mirrors the proto package path in the output directory. +GENERATED=$(find "$TMPDIR" -name '*.rs' -type f | head -1) + +if [[ -z "$GENERATED" || ! -f "$GENERATED" ]]; then + echo "error: no generated .rs file found in $TMPDIR" >&2 + exit 1 +fi + +if [[ ! -s "$GENERATED" ]]; then + echo "error: generated file is empty" >&2 + exit 1 +fi + +cp "$GENERATED" "$OUTPUT_FILE" +echo "Updated $OUTPUT_FILE" +echo "Done." diff --git a/relay-profiling/protos/perfetto_trace.proto b/relay-profiling/protos/perfetto_trace.proto new file mode 100644 index 00000000000..4a05bc949d0 --- /dev/null +++ b/relay-profiling/protos/perfetto_trace.proto @@ -0,0 +1,120 @@ +// Minimal subset of the Perfetto trace proto definitions needed to decode +// profiling data. Field numbers match the upstream definitions at +// https://github.com/google/perfetto/tree/master/protos/perfetto/trace + +syntax = "proto2"; +package perfetto.protos; + +message Trace { + repeated TracePacket packet = 1; +} + +message TracePacket { + optional uint64 timestamp = 8; + + oneof optional_trusted_packet_sequence_id { + uint32 trusted_packet_sequence_id = 10; + } + + optional InternedData interned_data = 12; + optional uint32 sequence_flags = 13; + + // Only the oneof variants we care about; prost will skip the rest. + oneof data { + ProcessTree process_tree = 2; + ClockSnapshot clock_snapshot = 6; + StreamingProfilePacket streaming_profile_packet = 54; + TrackDescriptor track_descriptor = 60; + PerfSample perf_sample = 66; + } +} + +// --- process tree ------------------------------------------------------------ + +message ProcessTree { + message Thread { + optional int32 tid = 1; + optional string name = 2; + optional int32 tgid = 3; + } + repeated ProcessTree.Thread threads = 2; +} + +// --- clock sync --------------------------------------------------------------- + +message ClockSnapshot { + message Clock { + optional uint32 clock_id = 1; + optional uint64 timestamp = 2; + } + repeated Clock clocks = 1; + optional uint32 primary_trace_clock = 2; +} + +// --- interned data ----------------------------------------------------------- + +message InternedData { + repeated InternedString function_names = 5; + repeated Frame frames = 6; + repeated Callstack callstacks = 7; + repeated InternedString build_ids = 16; + repeated InternedString mapping_paths = 17; + repeated Mapping mappings = 19; +} + +message InternedString { + optional uint64 iid = 1; + optional bytes str = 2; +} + +// --- profiling common -------------------------------------------------------- + +message Frame { + optional uint64 iid = 1; + optional uint64 function_name_id = 2; + optional uint64 mapping_id = 3; + optional uint64 rel_pc = 4; +} + +message Mapping { + optional uint64 iid = 1; + optional uint64 build_id = 2; + optional uint64 start_offset = 3; + optional uint64 start = 4; + optional uint64 end = 5; + optional uint64 load_bias = 6; + repeated uint64 path_string_ids = 7; + optional uint64 exact_offset = 8; +} + +message Callstack { + optional uint64 iid = 1; + repeated uint64 frame_ids = 2; +} + +// --- profiling packets ------------------------------------------------------- + +message PerfSample { + optional uint32 cpu = 1; + optional uint32 pid = 2; + optional uint32 tid = 3; + optional uint64 callstack_iid = 4; +} + +message StreamingProfilePacket { + repeated uint64 callstack_iid = 1; + repeated int64 timestamp_delta_us = 2; +} + +// --- track descriptors ------------------------------------------------------- + +message TrackDescriptor { + optional uint64 uuid = 1; + optional ThreadDescriptor thread = 4; +} + +message ThreadDescriptor { + optional int32 pid = 1; + optional int32 tid = 2; + optional string thread_name = 5; +} diff --git a/relay-profiling/src/debug_image.rs b/relay-profiling/src/debug_image.rs index 52674cfc18f..cee227ce407 100644 --- a/relay-profiling/src/debug_image.rs +++ b/relay-profiling/src/debug_image.rs @@ -42,6 +42,27 @@ pub struct DebugImage { uuid: Option, } +impl DebugImage { + /// Creates a native (ELF/Symbolic) debug image from Perfetto mapping data. + pub fn native_image( + code_file: String, + debug_id: DebugId, + image_addr: u64, + image_vmaddr: u64, + image_size: u64, + ) -> Self { + Self { + code_file: Some(code_file.into()), + debug_id: Some(debug_id), + image_type: ImageType::Symbolic, + image_addr: Some(Addr(image_addr)), + image_vmaddr: Some(Addr(image_vmaddr)), + image_size, + uuid: None, + } + } +} + pub fn get_proguard_image(uuid: &str) -> Result { Ok(DebugImage { code_file: None, diff --git a/relay-profiling/src/lib.rs b/relay-profiling/src/lib.rs index 2368a787b01..8d89fe266e2 100644 --- a/relay-profiling/src/lib.rs +++ b/relay-profiling/src/lib.rs @@ -39,6 +39,7 @@ //! //! Relay will forward those profiles encoded with `msgpack` after unpacking them if needed and push a message on Kafka. +use std::collections::BTreeMap; use std::error::Error; use std::net::IpAddr; use std::time::Duration; @@ -63,6 +64,7 @@ mod error; mod extract_from_transaction; mod measurements; mod outcomes; +mod perfetto; mod sample; mod transaction_metadata; mod types; @@ -353,6 +355,90 @@ impl ProfileChunk { } } +/// The result of expanding a binary Perfetto trace via [`expand_perfetto`]. +/// +/// Carries the serialized Sample v2 JSON payload together with the profile +/// metadata needed downstream (platform, profile type, inbound filtering) so +/// that callers do **not** need to deserialize the payload a second time. +#[derive(Debug)] +pub struct ExpandedPerfettoChunk { + /// Serialized Sample v2 JSON payload, ready for ingestion. + pub payload: Vec, + /// Platform string extracted from the metadata (e.g. `"android"`). + pub platform: String, + /// Release string from the metadata, used for inbound filtering. + release: Option, +} + +impl ExpandedPerfettoChunk { + /// Returns the [`ProfileType`] derived from the platform. + pub fn profile_type(&self) -> ProfileType { + ProfileType::from_platform(&self.platform) + } + + /// Applies inbound filters to the profile chunk. + pub fn filter( + &self, + client_ip: Option, + filter_settings: &ProjectFiltersConfig, + global_config: &GlobalConfig, + ) -> Result<(), ProfileError> { + relay_filter::should_filter(self, client_ip, filter_settings, global_config.filters()) + .map_err(ProfileError::Filtered) + } +} + +impl Filterable for ExpandedPerfettoChunk { + fn release(&self) -> Option<&str> { + self.release.as_deref() + } +} + +impl Getter for ExpandedPerfettoChunk { + fn get_value(&self, path: &str) -> Option> { + match path.strip_prefix("event.")? { + "release" => self.release.as_deref().map(|r| r.into()), + "platform" => Some(self.platform.as_str().into()), + _ => None, + } + } +} + +/// Expands a binary Perfetto trace into a Sample v2 profile chunk. +/// +/// Decodes the protobuf trace, converts it into the internal Sample v2 format, +/// merges the provided JSON `metadata_json` (containing platform, environment, etc.), +/// and returns an [`ExpandedPerfettoChunk`] with the serialized JSON payload plus +/// the profile metadata needed for downstream processing (platform, profile type, +/// inbound filtering) — avoiding a second JSON deserialization pass in callers. +pub fn expand_perfetto( + perfetto_bytes: &[u8], + metadata_json: &[u8], +) -> Result { + let d = &mut Deserializer::from_slice(metadata_json); + let metadata: sample::v2::ProfileMetadata = + serde_path_to_error::deserialize(d).map_err(ProfileError::InvalidJson)?; + + let platform = metadata.platform.clone(); + let release = metadata.release.clone(); + + let (profile_data, debug_images) = perfetto::convert(perfetto_bytes)?; + let mut chunk = sample::v2::ProfileChunk { + measurements: BTreeMap::new(), + metadata, + profile: profile_data, + }; + chunk.metadata.debug_meta.images.extend(debug_images); + chunk.normalize()?; + + let payload = serde_json::to_vec(&chunk).map_err(|_| ProfileError::CannotSerializePayload)?; + Ok(ExpandedPerfettoChunk { + payload, + platform, + release, + }) +} + #[cfg(test)] mod tests { use super::*; @@ -417,4 +503,74 @@ mod tests { .is_ok() ); } + + #[test] + fn test_expand_perfetto() { + let perfetto_bytes = include_bytes!("../tests/fixtures/android/perfetto/android.pftrace"); + + let metadata_json = serde_json::json!({ + "version": "2", + "chunk_id": "0432a0a4c25f4697bf9f0a2fcbe6a814", + "profiler_id": "4d229f1d3807421ba62a5f8bc295d836", + "platform": "android", + "content_type": "perfetto", + "client_sdk": {"name": "sentry-android", "version": "1.0"}, + }); + let metadata_bytes = serde_json::to_vec(&metadata_json).unwrap(); + + let result = expand_perfetto(perfetto_bytes, &metadata_bytes); + assert!(result.is_ok(), "expand_perfetto failed: {result:?}"); + + let expanded = result.unwrap(); + assert_eq!(expanded.platform, "android"); + assert_eq!(expanded.profile_type(), ProfileType::Ui); + + let output: sample::v2::ProfileChunk = serde_json::from_slice(&expanded.payload).unwrap(); + assert_eq!(output.metadata.platform, expanded.platform); + assert!(!output.profile.samples.is_empty()); + assert!(!output.profile.frames.is_empty()); + assert!( + !output.metadata.debug_meta.images.is_empty(), + "expected debug images from native mappings in the fixture" + ); + } + + #[test] + fn test_expand_perfetto_invalid_metadata() { + let result = expand_perfetto(b"", b"not json"); + assert!(result.is_err()); + } + + #[test] + fn test_expand_perfetto_empty_trace() { + // Valid metadata but no profiling samples in the binary → should fail. + let metadata_bytes = serde_json::to_vec(&serde_json::json!({ + "version": "2", + "chunk_id": "0432a0a4c25f4697bf9f0a2fcbe6a814", + "profiler_id": "4d229f1d3807421ba62a5f8bc295d836", + "platform": "android", + "content_type": "perfetto", + "client_sdk": {"name": "sentry-android", "version": "1.0"}, + })) + .unwrap(); + let result = expand_perfetto(b"", &metadata_bytes); + assert!(result.is_err()); + } + + #[test] + fn test_expand_perfetto_missing_required_field() { + // metadata is missing the required `chunk_id` field → deserialization error. + let metadata_bytes = serde_json::to_vec(&serde_json::json!({ + "version": "2", + "profiler_id": "4d229f1d3807421ba62a5f8bc295d836", + "platform": "android", + "client_sdk": {"name": "sentry-android", "version": "1.0"}, + })) + .unwrap(); + let result = expand_perfetto(b"", &metadata_bytes); + assert!( + matches!(result, Err(ProfileError::InvalidJson(_))), + "expected InvalidJson, got {result:?}" + ); + } } diff --git a/relay-profiling/src/perfetto/mod.rs b/relay-profiling/src/perfetto/mod.rs new file mode 100644 index 00000000000..1ee35064ffd --- /dev/null +++ b/relay-profiling/src/perfetto/mod.rs @@ -0,0 +1,1836 @@ +//! Perfetto trace format conversion to Sample v2. +//! +//! Handles both `PerfSample` (CPU profiling via `perf_event_open`) and +//! `StreamingProfilePacket` (in-process stack sampling) packet types. + +use std::collections::BTreeMap; + +use data_encoding::HEXLOWER; +use hashbrown::{HashMap, HashSet}; +use prost::Message; + +use relay_event_schema::protocol::{Addr, DebugId}; +use relay_protocol::FiniteF64; + +use crate::debug_image::DebugImage; +use crate::error::ProfileError; +use crate::sample::v2::{ProfileData, Sample}; +use crate::sample::{Frame, ThreadMetadata}; + +mod proto; + +use proto::trace_packet::Data; + +/// Maximum number of raw samples we collect from a Perfetto trace before +/// bailing out. At 100 Hz across multiple threads, a 66-second chunk +/// produces at most ~6 600 samples per thread; 100 000 provides generous +/// headroom while bounding memory usage against adversarial input. +const MAX_SAMPLES: usize = 100_000; + +/// See . +const SEQ_INCREMENTAL_STATE_CLEARED: u32 = 1; + +/// Perfetto builtin clock IDs. +/// See . +const CLOCK_REALTIME: u32 = 1; +const CLOCK_BOOTTIME: u32 = 6; + +fn has_incremental_state_cleared(packet: &proto::TracePacket) -> bool { + packet + .sequence_flags + .is_some_and(|f| f & SEQ_INCREMENTAL_STATE_CLEARED != 0) +} + +fn trusted_packet_sequence_id(packet: &proto::TracePacket) -> u32 { + match packet.optional_trusted_packet_sequence_id { + Some(proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(id)) => { + id + } + None => 0, + } +} + +fn extract_clock_offset(cs: &proto::ClockSnapshot) -> Option { + let mut boottime_ns: Option = None; + let mut realtime_ns: Option = None; + + for clock in &cs.clocks { + match clock.clock_id { + Some(CLOCK_BOOTTIME) => boottime_ns = clock.timestamp, + Some(CLOCK_REALTIME) => realtime_ns = clock.timestamp, + _ => {} + } + } + + match (realtime_ns, boottime_ns) { + (Some(rt), Some(bt)) => Some(rt as i128 - bt as i128), + _ => None, + } +} + +/// Per-sequence interned data tables, mirroring Perfetto's incremental state. +/// +/// Perfetto traces use interned IDs to avoid repeating large strings and +/// structures in every packet. Each trusted packet sequence maintains its +/// own set of intern tables that can be cleared on state resets. +/// +/// Per the Perfetto spec, each `InternedData` field constructs its **own** +/// interning index — IDs are scoped per field, not shared across string types. +/// See . +#[derive(Debug, Default)] +struct InternTables { + function_names: HashMap, + mapping_paths: HashMap, + /// Build IDs stored as hex-encoded strings (normalized from raw bytes). + build_ids: HashMap, + frames: HashMap, + callstacks: HashMap, + mappings: HashMap, +} + +impl InternTables { + fn clear(&mut self) { + self.function_names.clear(); + self.mapping_paths.clear(); + self.build_ids.clear(); + self.frames.clear(); + self.callstacks.clear(); + self.mappings.clear(); + } + + fn merge(&mut self, data: &proto::InternedData) { + for s in &data.function_names { + if let Some(iid) = s.iid { + let value = s + .r#str + .as_deref() + .and_then(|b| std::str::from_utf8(b).ok()) + .unwrap_or("") + .to_owned(); + self.function_names.insert(iid, value); + } + } + for s in &data.mapping_paths { + if let Some(iid) = s.iid { + let value = s + .r#str + .as_deref() + .and_then(|b| std::str::from_utf8(b).ok()) + .unwrap_or("") + .to_owned(); + self.mapping_paths.insert(iid, value); + } + } + // Build IDs are raw bytes in Perfetto traces; normalize to hex for later lookup. + for s in &data.build_ids { + if let Some(iid) = s.iid { + let value = match s.r#str.as_deref() { + Some(bytes) if !bytes.is_empty() => HEXLOWER.encode(bytes), + _ => String::new(), + }; + self.build_ids.insert(iid, value); + } + } + for f in &data.frames { + if let Some(iid) = f.iid { + self.frames.insert(iid, *f); + } + } + for c in &data.callstacks { + if let Some(iid) = c.iid { + self.callstacks.insert(iid, c.clone()); + } + } + for m in &data.mappings { + if let Some(iid) = m.iid { + self.mappings.insert(iid, m.clone()); + } + } + } +} + +/// Deduplication key for resolved stack frames. +/// +/// Two Perfetto frames that resolve to the same function, module, package, +/// and instruction address are considered identical and share a single index +/// in the output frame list. +#[derive(Debug, PartialEq, Eq, Hash)] +struct FrameKey { + function: Option, + module: Option, + package: Option, + instruction_addr: Option, +} + +/// Converts a Perfetto binary trace into Sample v2 [`ProfileData`] and debug images. +pub fn convert(perfetto_bytes: &[u8]) -> Result<(ProfileData, Vec), ProfileError> { + let trace = + proto::Trace::decode(perfetto_bytes).map_err(|_| ProfileError::InvalidSampledProfile)?; + + let mut tables_by_seq: HashMap = HashMap::new(); + let mut thread_meta: BTreeMap = BTreeMap::new(); + let mut clock_offset_ns: Option = None; + let mut observed_pid: Option = None; + // Maps trusted_packet_sequence_id → tid for StreamingProfilePacket, + // resolved via the TrackDescriptor → ThreadDescriptor chain. + let mut seq_id_to_tid: HashMap = HashMap::new(); + + // Samples are resolved eagerly during packet iteration (single-pass) so + // that incremental state resets don't cause earlier samples to be resolved + // against a post-reset intern table. We collect (ts_ns, tid, stack_id) + // tuples and apply clock offset + sorting after the loop. + let mut frame_index: HashMap = HashMap::new(); + let mut frames: Vec = Vec::new(); + let mut stack_index: HashMap, usize> = HashMap::new(); + let mut stacks: Vec> = Vec::new(); + // (timestamp_ns, tid, stack_id) + let mut resolved_samples: Vec<(u64, u32, usize)> = Vec::new(); + let mut sample_count: usize = 0; + let mut debug_images: Vec = Vec::new(); + let mut seen_images: HashSet<(String, u64)> = HashSet::new(); + + for packet in &trace.packet { + let seq_id = trusted_packet_sequence_id(packet); + + if has_incremental_state_cleared(packet) { + tables_by_seq.entry(seq_id).or_default().clear(); + } + + if let Some(ref interned) = packet.interned_data { + tables_by_seq.entry(seq_id).or_default().merge(interned); + } + + match &packet.data { + Some(Data::ClockSnapshot(cs)) => { + if clock_offset_ns.is_none() { + clock_offset_ns = extract_clock_offset(cs); + } + } + Some(Data::ProcessTree(pt)) => { + for thread in &pt.threads { + if let Some(tid) = thread.tid { + let tid_str = tid.to_string(); + thread_meta + .entry(tid_str) + .or_insert_with(|| ThreadMetadata { + name: thread.name.clone(), + priority: None, + }); + } + } + } + Some(Data::TrackDescriptor(td)) => { + if let Some(ref thread) = td.thread + && let Some(tid) = thread.tid + { + let tid_str = tid.to_string(); + thread_meta + .entry(tid_str) + .or_insert_with(|| ThreadMetadata { + name: thread.thread_name.clone(), + priority: None, + }); + // Associate this packet sequence with the thread so that + // StreamingProfilePacket samples can resolve their tid. + if seq_id != 0 { + seq_id_to_tid.entry(seq_id).or_insert(tid as u32); + } + } + } + Some(Data::PerfSample(ps)) => { + if let Some(callstack_iid) = ps.callstack_iid { + let ts = packet.timestamp.unwrap_or(0); + let tid = ps.tid.unwrap_or(0); + if observed_pid.is_none() { + observed_pid = ps.pid; + } + sample_count += 1; + if let Some(stack_id) = resolve_callstack( + callstack_iid, + seq_id, + &tables_by_seq, + &mut frame_index, + &mut frames, + &mut stack_index, + &mut stacks, + &mut debug_images, + &mut seen_images, + ) { + resolved_samples.push((ts, tid, stack_id)); + } + } + } + Some(Data::StreamingProfilePacket(spp)) => { + let tid = seq_id_to_tid.get(&seq_id).copied().unwrap_or(0); + let mut ts = packet.timestamp.unwrap_or(0); + for (i, &cs_iid) in spp.callstack_iid.iter().enumerate() { + if let Some(&delta) = spp.timestamp_delta_us.get(i) { + // `delta` is i64 (can be negative for out-of-order samples). + // Casting to u64 wraps negative values, which is correct because + // `wrapping_add` of a wrapped negative value subtracts as expected. + ts = ts.wrapping_add((delta * 1000) as u64); + } + sample_count += 1; + if let Some(stack_id) = resolve_callstack( + cs_iid, + seq_id, + &tables_by_seq, + &mut frame_index, + &mut frames, + &mut stack_index, + &mut stacks, + &mut debug_images, + &mut seen_images, + ) { + resolved_samples.push((ts, tid, stack_id)); + } + } + } + None => {} + } + + if sample_count > MAX_SAMPLES { + return Err(ProfileError::ExceedSizeLimit); + } + } + + if resolved_samples.is_empty() { + return Err(ProfileError::NotEnoughSamples); + } + + // On Android/Linux the main thread's tid equals the process pid. + // If the trace didn't include a ProcessTree or TrackDescriptor with a name + // for that thread, label it "main" so the UI can identify it. + if let Some(pid) = observed_pid { + let main_tid = pid.to_string(); + thread_meta + .entry(main_tid) + .or_insert_with(|| ThreadMetadata { + name: Some("main".to_owned()), + priority: None, + }); + } + + let clock_offset_ns = clock_offset_ns.ok_or(ProfileError::InvalidSampledProfile)?; + + resolved_samples.sort_by_key(|s| s.0); + + let mut samples: Vec = Vec::new(); + for &(ts_ns, tid, stack_id) in &resolved_samples { + // Compute absolute timestamp in integer nanoseconds first, then convert + // to f64 seconds once to avoid precision loss from adding large floats. + let abs_ns = ts_ns as i128 + clock_offset_ns; + let ts_secs = abs_ns as f64 / 1_000_000_000.0; + let ts_secs = (ts_secs * 1000.0).round() / 1000.0; + + if let Some(ts) = FiniteF64::new(ts_secs) { + samples.push(Sample { + timestamp: ts, + stack_id, + thread_id: tid.to_string(), + }); + } + } + + if samples.is_empty() { + return Err(ProfileError::NotEnoughSamples); + } + + Ok(( + ProfileData { + samples, + stacks, + frames, + thread_metadata: thread_meta, + }, + debug_images, + )) +} + +/// Resolves a callstack iid against the current intern tables, deduplicating +/// frames and stacks, and collecting debug images for native mappings. +/// +/// Returns `Some(stack_id)` if the callstack was resolved, or `None` if the +/// callstack iid was not found in the tables. +#[allow(clippy::too_many_arguments)] +fn resolve_callstack( + cs_iid: u64, + seq_id: u32, + tables_by_seq: &HashMap, + frame_index: &mut HashMap, + frames: &mut Vec, + stack_index: &mut HashMap, usize>, + stacks: &mut Vec>, + debug_images: &mut Vec, + seen_images: &mut HashSet<(String, u64)>, +) -> Option { + let empty_tables = InternTables::default(); + let tables = tables_by_seq.get(&seq_id).unwrap_or(&empty_tables); + + let callstack = tables.callstacks.get(&cs_iid)?; + + let mut resolved_frame_indices: Vec = Vec::with_capacity(callstack.frame_ids.len()); + + for &frame_iid in &callstack.frame_ids { + let Some(pf) = tables.frames.get(&frame_iid) else { + continue; + }; + + let function_name = pf + .function_name_id + .and_then(|id| tables.function_names.get(&id)) + .cloned(); + + if let Some(mid) = pf.mapping_id { + collect_debug_image(mid, tables, debug_images, seen_images); + } + + let (key, frame) = build_frame(function_name, pf, tables); + + let idx = if let Some(&existing) = frame_index.get(&key) { + existing + } else { + let idx = frames.len(); + frame_index.insert(key, idx); + frames.push(frame); + idx + }; + + resolved_frame_indices.push(idx); + } + + // Perfetto stacks are root-first, Sample v2 is leaf-first. + resolved_frame_indices.reverse(); + + let stack_id = if let Some(&existing) = stack_index.get(&resolved_frame_indices) { + existing + } else { + let id = stacks.len(); + stack_index.insert(resolved_frame_indices.clone(), id); + stacks.push(resolved_frame_indices); + id + }; + + Some(stack_id) +} + +/// Collects a debug image from a native mapping if not already seen. +fn collect_debug_image( + mapping_id: u64, + tables: &InternTables, + debug_images: &mut Vec, + seen_images: &mut HashSet<(String, u64)>, +) { + let Some(mapping) = tables.mappings.get(&mapping_id) else { + return; + }; + + let code_file = { + let parts: Vec<&str> = mapping + .path_string_ids + .iter() + .filter_map(|id| tables.mapping_paths.get(id).map(|s| s.as_str())) + .collect(); + if parts.is_empty() { + return; + } + parts.join("/") + }; + + if is_java_mapping(&code_file) { + return; + } + + let image_addr = mapping.start.unwrap_or(0); + + if !seen_images.insert((code_file.clone(), image_addr)) { + return; + } + + let debug_id = mapping + .build_id + .and_then(|bid| tables.build_ids.get(&bid)) + .and_then(|hex_str| build_id_to_debug_id(hex_str)); + + let Some(debug_id) = debug_id else { + return; + }; + + let image_size = mapping.end.unwrap_or(0).saturating_sub(image_addr); + let image_vmaddr = mapping.load_bias.unwrap_or(0); + + debug_images.push(DebugImage::native_image( + code_file, + debug_id, + image_addr, + image_vmaddr, + image_size, + )); +} + +/// Resolves a Perfetto frame into a [`FrameKey`] and a Sample v2 [`Frame`]. +/// +/// Java frames (identified by mapping path) have their fully-qualified name +/// split into module and function. Native frames compute an absolute +/// instruction address from `rel_pc` and the mapping start address. +fn build_frame( + function_name: Option, + pf: &proto::Frame, + tables: &InternTables, +) -> (FrameKey, Frame) { + let mapping = pf.mapping_id.and_then(|mid| tables.mappings.get(&mid)); + + let mapping_path = mapping.and_then(|m| { + let parts: Vec<&str> = m + .path_string_ids + .iter() + .filter_map(|id| tables.mapping_paths.get(id).map(|s| s.as_str())) + .collect(); + if parts.is_empty() { + None + } else { + Some(parts.join("/")) + } + }); + + let is_java = mapping_path.as_deref().is_some_and(is_java_mapping); + + if is_java { + // For Java frames, split "com.example.MyClass.myMethod" into + // module="com.example.MyClass" and function="myMethod". + let (module, function) = match &function_name { + Some(name) => match name.rsplit_once('.') { + Some((class, method)) => (Some(class.to_owned()), Some(method.to_owned())), + None => (None, Some(name.clone())), + }, + None => (None, None), + }; + + let key = FrameKey { + function: function.clone(), + module: module.clone(), + package: mapping_path.clone(), + instruction_addr: None, + }; + + let frame = Frame { + function, + module, + package: mapping_path, + platform: Some("java".to_owned()), + ..Default::default() + }; + + (key, frame) + } else { + let instruction_addr = match (pf.rel_pc, mapping) { + (Some(rel_pc), Some(m)) => Some(rel_pc.wrapping_add(m.start.unwrap_or(0))), + (Some(rel_pc), None) => Some(rel_pc), + (None, _) => None, + }; + + let key = FrameKey { + function: function_name.clone(), + module: None, + package: mapping_path.clone(), + instruction_addr, + }; + + let frame = Frame { + function: function_name, + package: mapping_path, + instruction_addr: instruction_addr.map(Addr), + platform: Some("native".to_owned()), + ..Default::default() + }; + + (key, frame) + } +} + +/// Returns `true` if the mapping path indicates a JVM/ART runtime mapping. +fn is_java_mapping(path: &str) -> bool { + const JVM_EXTENSIONS: &[&str] = &[".oat", ".odex", ".vdex", ".jar", ".dex"]; + + if path.contains("dalvik-jit-code-cache") { + return true; + } + JVM_EXTENSIONS.iter().any(|ext| path.ends_with(ext)) +} + +/// Converts a hex-encoded ELF build ID string into a Sentry [`DebugId`]. +/// +/// The first 16 bytes of the build ID are interpreted as a little-endian UUID +/// (byte-swapping the time_low, time_mid, and time_hi_and_version fields). +/// If the build ID is shorter than 16 bytes it is zero-padded on the right. +fn build_id_to_debug_id(hex_str: &str) -> Option { + let bytes = HEXLOWER.decode(hex_str.as_bytes()).ok()?; + if bytes.is_empty() { + return None; + } + + let mut buf = [0u8; 16]; + let len = bytes.len().min(16); + buf[..len].copy_from_slice(&bytes[..len]); + + // Swap from little-endian ELF byte order to UUID mixed-endian format. + // time_low (bytes 0..4): reverse + buf[..4].reverse(); + // time_mid (bytes 4..6): reverse + buf[4..6].reverse(); + // time_hi_and_version (bytes 6..8): reverse + buf[6..8].reverse(); + + let uuid = uuid::Uuid::from_bytes(buf); + uuid.to_string().parse().ok() +} + +#[cfg(test)] +mod tests { + use super::*; + + const TEST_BOOTTIME_NS: u64 = 1_000_000_000; + const TEST_REALTIME_NS: u64 = 1_700_000_001_000_000_000; + + fn make_clock_snapshot_packet() -> proto::TracePacket { + proto::TracePacket { + timestamp: None, + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: None, + data: Some(Data::ClockSnapshot(proto::ClockSnapshot { + clocks: vec![ + proto::clock_snapshot::Clock { + clock_id: Some(CLOCK_BOOTTIME), + timestamp: Some(TEST_BOOTTIME_NS), + }, + proto::clock_snapshot::Clock { + clock_id: Some(CLOCK_REALTIME), + timestamp: Some(TEST_REALTIME_NS), + }, + ], + primary_trace_clock: Some(CLOCK_BOOTTIME), + })), + } + } + + fn make_interned_string(iid: u64, value: &[u8]) -> proto::InternedString { + proto::InternedString { + iid: Some(iid), + r#str: Some(value.to_vec()), + } + } + + fn make_frame(iid: u64, function_name_id: u64) -> proto::Frame { + proto::Frame { + iid: Some(iid), + function_name_id: Some(function_name_id), + mapping_id: None, + rel_pc: None, + } + } + + fn make_perf_sample_packet( + timestamp: u64, + seq_id: u32, + tid: u32, + callstack_iid: u64, + ) -> proto::TracePacket { + proto::TracePacket { + timestamp: Some(timestamp), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId( + seq_id, + ), + ), + data: Some(Data::PerfSample(proto::PerfSample { + cpu: None, + pid: None, + tid: Some(tid), + callstack_iid: Some(callstack_iid), + })), + } + } + + fn make_perf_sample_packet_with_pid( + timestamp: u64, + seq_id: u32, + pid: u32, + tid: u32, + callstack_iid: u64, + ) -> proto::TracePacket { + proto::TracePacket { + timestamp: Some(timestamp), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId( + seq_id, + ), + ), + data: Some(Data::PerfSample(proto::PerfSample { + cpu: None, + pid: Some(pid), + tid: Some(tid), + callstack_iid: Some(callstack_iid), + })), + } + } + + fn make_interned_data_packet( + seq_id: u32, + clear_state: bool, + interned_data: proto::InternedData, + ) -> proto::TracePacket { + proto::TracePacket { + timestamp: None, + interned_data: Some(interned_data), + sequence_flags: if clear_state { + Some(SEQ_INCREMENTAL_STATE_CLEARED) + } else { + None + }, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId( + seq_id, + ), + ), + data: None, + } + } + + fn build_minimal_trace() -> Vec { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![ + make_interned_string(1, b"main"), + make_interned_string(2, b"foo"), + ], + frames: vec![ + proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: None, + rel_pc: Some(0x1000), + }, + proto::Frame { + iid: Some(2), + function_name_id: Some(2), + mapping_id: None, + rel_pc: Some(0x2000), + }, + ], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1, 2], // root-first: main -> foo + }], + ..Default::default() + }, + ), + // Thread descriptor. + proto::TracePacket { + timestamp: None, + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::TrackDescriptor(proto::TrackDescriptor { + uuid: None, + thread: Some(proto::ThreadDescriptor { + pid: Some(100), + tid: Some(42), + thread_name: Some("main-thread".to_owned()), + }), + })), + }, + make_perf_sample_packet(1_000_000_000, 1, 42, 1), + make_perf_sample_packet(1_010_000_000, 1, 42, 1), + ], + }; + trace.encode_to_vec() + } + + #[test] + fn test_convert_minimal_trace() { + let bytes = build_minimal_trace(); + let result = convert(&bytes); + assert!(result.is_ok(), "conversion failed: {result:?}"); + + let (data, _images) = result.unwrap(); + + assert_eq!(data.samples.len(), 2); + assert_eq!(data.samples[0].thread_id, "42"); + assert_eq!(data.frames.len(), 2); + + assert_eq!(data.stacks.len(), 1); + let stack = &data.stacks[0]; + assert_eq!(stack.len(), 2); + + // Leaf-first order: foo, then main. + assert_eq!(data.frames[stack[0]].function.as_deref(), Some("foo")); + assert_eq!(data.frames[stack[1]].function.as_deref(), Some("main")); + + assert!(data.thread_metadata.contains_key("42")); + assert_eq!( + data.thread_metadata["42"].name.as_deref(), + Some("main-thread") + ); + } + + #[test] + fn test_convert_empty_trace() { + let trace = proto::Trace { packet: vec![] }; + let bytes = trace.encode_to_vec(); + let result = convert(&bytes); + assert!(matches!(result, Err(ProfileError::NotEnoughSamples))); + } + + #[test] + fn test_convert_invalid_protobuf() { + let result = convert(b"not a valid protobuf"); + assert!(matches!(result, Err(ProfileError::InvalidSampledProfile))); + } + + #[test] + fn test_convert_missing_clock_snapshot() { + let trace = proto::Trace { + packet: vec![ + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let result = convert(&bytes); + assert!(matches!(result, Err(ProfileError::InvalidSampledProfile))); + } + + #[test] + fn test_streaming_profile_packet() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func_a")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(10), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + proto::TracePacket { + timestamp: Some(2_000_000_000), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::StreamingProfilePacket( + proto::StreamingProfilePacket { + callstack_iid: vec![10, 10], + timestamp_delta_us: vec![5_000, 10_000], // +5ms, +10ms + }, + )), + }, + ], + }; + let bytes = trace.encode_to_vec(); + let result = convert(&bytes); + assert!(result.is_ok(), "conversion failed: {result:?}"); + + let (data, _images) = result.unwrap(); + assert_eq!(data.samples.len(), 2); + // Timestamps are rebased using ClockSnapshot: offset = REALTIME - BOOTTIME. + let duration = data.samples[1].timestamp.to_f64() - data.samples[0].timestamp.to_f64(); + assert!( + (duration - 0.01).abs() < 0.001, + "expected ~10ms delta between samples, got {duration}" + ); + // First sample: base timestamp 2.0s + first delta 5ms = 2.005s boottime, + // then rebased with clock offset. + let expected_offset = (TEST_REALTIME_NS as f64 - TEST_BOOTTIME_NS as f64) / 1e9; + let expected_ts = 2.005 + expected_offset; + assert!( + (data.samples[0].timestamp.to_f64() - expected_ts).abs() < 0.001, + "expected first sample at ~{expected_ts}, got {}", + data.samples[0].timestamp.to_f64() + ); + } + + #[test] + fn test_mapping_resolution() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"my_func")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: Some(1), + rel_pc: Some(0x100), + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + mappings: vec![proto::Mapping { + iid: Some(1), + build_id: None, + start: Some(0x7000), + end: Some(0x8000), + load_bias: None, + path_string_ids: vec![10], + ..Default::default() + }], + mapping_paths: vec![make_interned_string(10, b"libfoo.so")], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, images) = convert(&bytes).unwrap(); + + assert_eq!(data.frames.len(), 1); + let frame = &data.frames[0]; + assert_eq!(frame.platform.as_deref(), Some("native")); + assert_eq!(frame.function.as_deref(), Some("my_func")); + assert_eq!(frame.instruction_addr, Some(Addr(0x7100))); // rel_pc + start + assert_eq!(frame.package.as_deref(), Some("libfoo.so")); + assert!(frame.module.is_none()); + // No build_id on the mapping, so no debug images. + assert!(images.is_empty()); + } + + #[test] + fn test_separate_interning_namespaces() { + // Perfetto uses separate ID namespaces per InternedData field. + // function_names iid=1 and mapping_paths iid=1 must NOT collide. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"my_func")], + mapping_paths: vec![make_interned_string(1, b"libfoo.so")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: Some(1), + rel_pc: Some(0x100), + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + mappings: vec![proto::Mapping { + iid: Some(1), + start: Some(0x7000), + path_string_ids: vec![1], + ..Default::default() + }], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.frames.len(), 1); + let frame = &data.frames[0]; + // Both use iid=1 but must resolve independently. + assert_eq!(frame.function.as_deref(), Some("my_func")); + assert_eq!(frame.package.as_deref(), Some("libfoo.so")); + } + + #[test] + fn test_incremental_state_reset() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"old_func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + // State reset replaces everything. + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"new_func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + // After reset, "old_func" should be gone; only "new_func" remains. + assert_eq!(data.frames.len(), 1); + assert_eq!(data.frames[0].function.as_deref(), Some("new_func")); + } + + #[test] + fn test_incremental_state_reset_with_samples_before_and_after() { + // Samples collected before an incremental state reset must resolve + // against the pre-reset intern tables, not the post-reset ones. + // This catches the two-pass bug where deferred resolution would use + // the final (post-reset) table state for all samples. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + // Pre-reset: iid 1 = "old_func". + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"old_func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + // Sample BEFORE reset — should resolve to "old_func". + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + // State reset: iid 1 now = "new_func". + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"new_func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + // Sample AFTER reset — should resolve to "new_func". + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.samples.len(), 2); + // Both functions must be present — the pre-reset sample must NOT + // silently resolve to "new_func". + assert_eq!(data.frames.len(), 2); + let frame_names: Vec<_> = data + .frames + .iter() + .map(|f| f.function.as_deref().unwrap_or("")) + .collect(); + assert!( + frame_names.contains(&"old_func"), + "expected old_func from pre-reset sample, got: {frame_names:?}" + ); + assert!( + frame_names.contains(&"new_func"), + "expected new_func from post-reset sample, got: {frame_names:?}" + ); + } + + #[test] + fn test_convert_android_pftrace() { + let bytes = include_bytes!("../../tests/fixtures/android/perfetto/android.pftrace"); + + let result = convert(bytes.as_slice()); + assert!(result.is_ok(), "conversion failed: {result:?}"); + + let (data, images) = result.unwrap(); + assert!(!data.samples.is_empty(), "expected samples"); + assert!(!data.frames.is_empty(), "expected frames"); + assert!(!data.stacks.is_empty(), "expected stacks"); + + // All samples must reference valid stacks. + for sample in &data.samples { + assert!( + sample.stack_id < data.stacks.len(), + "sample references out-of-bounds stack_id {}", + sample.stack_id + ); + } + + // All stacks must reference valid frames. + for stack in &data.stacks { + for &frame_idx in stack { + assert!( + frame_idx < data.frames.len(), + "stack references out-of-bounds frame index {frame_idx}", + ); + } + } + + let java_count = data + .frames + .iter() + .filter(|f| f.platform.as_deref() == Some("java")) + .count(); + let native_count = data + .frames + .iter() + .filter(|f| f.platform.as_deref() == Some("native")) + .count(); + assert!(java_count > 0, "expected java frames"); + assert!(native_count > 0, "expected native frames"); + + assert!( + !images.is_empty(), + "expected debug images from native mappings" + ); + + // The fixture contains samples from multiple threads. + let thread_ids: std::collections::BTreeSet<&str> = + data.samples.iter().map(|s| s.thread_id.as_str()).collect(); + assert!( + thread_ids.len() > 1, + "expected samples from multiple threads, got: {thread_ids:?}" + ); + + // The fixture has no ProcessTree/TrackDescriptor, but the main thread + // (tid == pid) should still be labeled "main" via pid-based inference. + assert!( + !data.thread_metadata.is_empty(), + "expected main thread metadata from pid inference" + ); + // The lowest tid in PerfSample traces is typically the main thread (tid == pid). + let main_tid = thread_ids.iter().next().unwrap(); + assert_eq!( + data.thread_metadata + .get(*main_tid) + .and_then(|m| m.name.as_deref()), + Some("main"), + "expected main thread to be labeled via pid inference" + ); + } + + #[test] + fn test_frame_deduplication() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"shared")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: None, + rel_pc: Some(0x100), + }], + // Two different callstacks referencing the same frame. + callstacks: vec![ + proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }, + proto::Callstack { + iid: Some(2), + frame_ids: vec![1], + }, + ], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 2), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + // Same frame referenced from two callstacks should be deduplicated. + assert_eq!(data.frames.len(), 1); + assert_eq!(data.stacks.len(), 1); // Same single-frame stack, also deduped. + assert_eq!(data.samples.len(), 2); + } + + #[test] + fn test_is_java_mapping() { + // JVM mappings. + assert!(is_java_mapping("system/framework/arm64/boot-framework.oat")); + assert!(is_java_mapping("data/app/.../oat/arm64/base.odex")); + assert!(is_java_mapping("base.vdex")); + assert!(is_java_mapping("system/framework/framework.jar")); + assert!(is_java_mapping("classes.dex")); + assert!(is_java_mapping("[anon_shmem:dalvik-jit-code-cache]")); + + // Native mappings. + assert!(!is_java_mapping("libc.so")); + assert!(!is_java_mapping("libhwui.so")); + assert!(!is_java_mapping("apex/com.android.art/lib64/libart.so")); + assert!(!is_java_mapping("app_process64")); + } + + #[test] + fn test_java_frame_splitting() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"android.view.View.draw")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: Some(1), + rel_pc: Some(0x100), + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + mappings: vec![proto::Mapping { + iid: Some(1), + start: Some(0x1000), + path_string_ids: vec![10], + ..Default::default() + }], + mapping_paths: vec![make_interned_string(10, b"boot-framework.oat")], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.frames.len(), 1); + let frame = &data.frames[0]; + assert_eq!(frame.platform.as_deref(), Some("java")); + assert_eq!(frame.module.as_deref(), Some("android.view.View")); + assert_eq!(frame.function.as_deref(), Some("draw")); + assert_eq!(frame.package.as_deref(), Some("boot-framework.oat")); + assert!(frame.instruction_addr.is_none()); + } + + #[test] + fn test_native_frame() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"__epoll_pwait")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: Some(1), + rel_pc: Some(0x100), + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + mappings: vec![proto::Mapping { + iid: Some(1), + start: Some(0x7000), + path_string_ids: vec![10], + ..Default::default() + }], + mapping_paths: vec![make_interned_string(10, b"libc.so")], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.frames.len(), 1); + let frame = &data.frames[0]; + assert_eq!(frame.platform.as_deref(), Some("native")); + assert_eq!(frame.function.as_deref(), Some("__epoll_pwait")); + assert_eq!(frame.package.as_deref(), Some("libc.so")); + assert_eq!(frame.instruction_addr, Some(Addr(0x7100))); // rel_pc + start + assert!(frame.module.is_none()); + } + + #[test] + fn test_build_id_to_debug_id() { + // 20-byte ELF build ID (common for GNU build IDs). + let debug_id = build_id_to_debug_id("b03e4a7f5e884c8da04b05fa32cc4cbd69faff51").unwrap(); + // First 16 bytes: b0 3e 4a 7f 5e 88 4c 8d a0 4b 05 fa 32 cc 4c bd + // After LE→UUID swap: + // time_low (0..4) reversed: 7f4a3eb0 + // time_mid (4..6) reversed: 885e + // time_hi (6..8) reversed: 8d4c + // rest (8..16) unchanged: a04b05fa32cc4cbd + assert_eq!(debug_id.to_string(), "7f4a3eb0-885e-8d4c-a04b-05fa32cc4cbd"); + } + + #[test] + fn test_build_id_to_debug_id_short() { + // Build ID shorter than 16 bytes → zero-padded. + let debug_id = build_id_to_debug_id("aabbccdd").unwrap(); + // Bytes: aa bb cc dd 00 00 00 00 00 00 00 00 00 00 00 00 + // After swap: ddccbbaa-0000-0000-0000-000000000000 + assert_eq!(debug_id.to_string(), "ddccbbaa-0000-0000-0000-000000000000"); + } + + #[test] + fn test_build_id_to_debug_id_empty() { + assert!(build_id_to_debug_id("").is_none()); + } + + #[test] + fn test_mapping_with_build_id() { + // Raw 20-byte ELF build ID (as it appears in Perfetto traces). + let build_id_raw: &[u8] = &[ + 0xb0, 0x3e, 0x4a, 0x7f, 0x5e, 0x88, 0x4c, 0x8d, 0xa0, 0x4b, 0x05, 0xfa, 0x32, 0xcc, + 0x4c, 0xbd, 0x69, 0xfa, 0xff, 0x51, + ]; + + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"native_func")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: Some(1), + rel_pc: Some(0x200), + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + mappings: vec![proto::Mapping { + iid: Some(1), + build_id: Some(20), + start: Some(0x7000_0000), + end: Some(0x7001_0000), + load_bias: Some(0x1000), + path_string_ids: vec![10], + start_offset: None, + exact_offset: None, + }], + mapping_paths: vec![make_interned_string(10, b"libexample.so")], + build_ids: vec![make_interned_string(20, build_id_raw)], + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), + make_perf_sample_packet(1_010_000_000, 1, 1, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, images) = convert(&bytes).unwrap(); + + assert_eq!(data.frames.len(), 1); + assert_eq!(images.len(), 1); + + let img_json = serde_json::to_value(&images[0]).unwrap(); + assert_eq!(img_json["code_file"], "libexample.so"); + assert_eq!(img_json["debug_id"], "7f4a3eb0-885e-8d4c-a04b-05fa32cc4cbd"); + assert_eq!(img_json["image_addr"], "0x70000000"); + assert_eq!(img_json["image_vmaddr"], "0x1000"); + assert_eq!(img_json["image_size"], 0x10000); + assert_eq!(img_json["type"], "symbolic"); + } + + #[test] + fn test_process_tree_thread_names() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + // ProcessTree with thread names. + proto::TracePacket { + timestamp: None, + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: None, + data: Some(proto::trace_packet::Data::ProcessTree(proto::ProcessTree { + threads: vec![ + proto::process_tree::Thread { + tid: Some(42), + name: Some("main".to_owned()), + tgid: Some(42), + }, + proto::process_tree::Thread { + tid: Some(43), + name: Some("RenderThread".to_owned()), + tgid: Some(42), + }, + ], + })), + }, + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"doWork")], + frames: vec![proto::Frame { + iid: Some(1), + function_name_id: Some(1), + mapping_id: None, + rel_pc: None, + }], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 42, 1), + make_perf_sample_packet(1_010_000_000, 1, 43, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.thread_metadata.len(), 2); + assert_eq!( + data.thread_metadata + .get("42") + .and_then(|m| m.name.as_deref()), + Some("main"), + ); + assert_eq!( + data.thread_metadata + .get("43") + .and_then(|m| m.name.as_deref()), + Some("RenderThread"), + ); + } + + #[test] + fn test_main_thread_inferred_from_pid() { + // When no ProcessTree/TrackDescriptor provides a thread name, the main + // thread (tid == pid) should be labeled "main" automatically. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"doWork")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet_with_pid(1_000_000_000, 1, 100, 100, 1), // main thread + make_perf_sample_packet_with_pid(1_010_000_000, 1, 100, 101, 1), // worker thread + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + // Main thread (tid == pid == 100) should be labeled "main". + assert_eq!( + data.thread_metadata + .get("100") + .and_then(|m| m.name.as_deref()), + Some("main"), + ); + // Worker thread (tid 101) should have no metadata since no name source exists. + assert!(!data.thread_metadata.contains_key("101")); + } + + #[test] + fn test_main_thread_not_overwritten_by_pid_inference() { + // If a ProcessTree already provides a name for the main thread, + // pid-based inference must NOT overwrite it. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + proto::TracePacket { + timestamp: None, + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: None, + data: Some(proto::trace_packet::Data::ProcessTree(proto::ProcessTree { + threads: vec![proto::process_tree::Thread { + tid: Some(100), + name: Some("ui-thread".to_owned()), + tgid: Some(100), + }], + })), + }, + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"doWork")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet_with_pid(1_000_000_000, 1, 100, 100, 1), + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + // The ProcessTree name "ui-thread" must be preserved, not replaced with "main". + assert_eq!( + data.thread_metadata + .get("100") + .and_then(|m| m.name.as_deref()), + Some("ui-thread"), + ); + } + + #[test] + fn test_main_thread_no_pid_for_streaming_packets() { + // StreamingProfilePacket doesn't carry a pid, so no main thread inference + // should occur. thread_metadata should be empty. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + proto::TracePacket { + timestamp: Some(2_000_000_000), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::StreamingProfilePacket( + proto::StreamingProfilePacket { + callstack_iid: vec![1], + timestamp_delta_us: vec![0], + }, + )), + }, + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert!( + data.thread_metadata.is_empty(), + "expected no thread metadata for streaming packets without ProcessTree" + ); + } + + #[test] + fn test_exceeds_max_samples() { + let mut packets = vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + ]; + for i in 0..=MAX_SAMPLES as u64 { + packets.push(make_perf_sample_packet(1_000_000_000 + i * 1_000, 1, 1, 1)); + } + let trace = proto::Trace { packet: packets }; + let bytes = trace.encode_to_vec(); + let result = convert(&bytes); + assert!( + matches!(result, Err(ProfileError::ExceedSizeLimit)), + "expected ExceedSizeLimit, got {result:?}" + ); + } + + #[test] + fn test_negative_timestamp_delta() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func_a")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(10), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + proto::TracePacket { + timestamp: Some(3_000_000_000), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::StreamingProfilePacket( + proto::StreamingProfilePacket { + callstack_iid: vec![10, 10, 10], + timestamp_delta_us: vec![1_000, 20_000, -5_000], // +1ms, +20ms, -5ms + }, + )), + }, + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.samples.len(), 3); + // After sorting: sample at 3.001s, then 3.001+0.015=3.016s, then 3.001+0.020=3.021s + let t0 = data.samples[0].timestamp.to_f64(); + let t1 = data.samples[1].timestamp.to_f64(); + let t2 = data.samples[2].timestamp.to_f64(); + assert!( + t0 < t1 && t1 < t2, + "expected sorted timestamps: {t0}, {t1}, {t2}" + ); + // The gap between t1 and t2 should be ~5ms (the -5ms sample comes before the +20ms one). + let gap = t2 - t1; + assert!((gap - 0.005).abs() < 0.001, "expected ~5ms gap, got {gap}"); + } + + #[test] + fn test_multi_sequence_traces() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + // Sequence 1: has "alpha" function. + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"alpha")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + // Sequence 2: reuses iid=1 but for "beta" function. + make_interned_data_packet( + 2, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"beta")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(1), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), // seq 1 -> alpha + make_perf_sample_packet(1_010_000_000, 2, 2, 1), // seq 2 -> beta + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.samples.len(), 2); + // Should have two distinct frames from the two sequences. + assert_eq!(data.frames.len(), 2); + let frame_names: Vec<_> = data + .frames + .iter() + .map(|f| f.function.as_deref().unwrap_or("")) + .collect(); + assert!(frame_names.contains(&"alpha"), "expected alpha frame"); + assert!(frame_names.contains(&"beta"), "expected beta frame"); + } + + #[test] + fn test_streaming_profile_resolves_tid_from_track_descriptor() { + // When a TrackDescriptor with a ThreadDescriptor is present for the same + // trusted_packet_sequence_id, StreamingProfilePacket samples should + // resolve the thread ID from that descriptor instead of defaulting to 0. + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func_a")], + frames: vec![make_frame(1, 1)], + callstacks: vec![proto::Callstack { + iid: Some(10), + frame_ids: vec![1], + }], + ..Default::default() + }, + ), + // TrackDescriptor associating seq_id=1 with tid=42. + proto::TracePacket { + timestamp: None, + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::TrackDescriptor(proto::TrackDescriptor { + uuid: None, + thread: Some(proto::ThreadDescriptor { + pid: Some(100), + tid: Some(42), + thread_name: Some("worker".to_owned()), + }), + })), + }, + // StreamingProfilePacket on seq_id=1 should get tid=42. + proto::TracePacket { + timestamp: Some(2_000_000_000), + interned_data: None, + sequence_flags: None, + optional_trusted_packet_sequence_id: Some( + proto::trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(1), + ), + data: Some(Data::StreamingProfilePacket( + proto::StreamingProfilePacket { + callstack_iid: vec![10], + timestamp_delta_us: vec![0], + }, + )), + }, + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + assert_eq!(data.samples.len(), 1); + assert_eq!( + data.samples[0].thread_id, "42", + "StreamingProfilePacket should resolve tid from TrackDescriptor" + ); + assert!(data.thread_metadata.contains_key("42")); + assert_eq!(data.thread_metadata["42"].name.as_deref(), Some("worker")); + } + + #[test] + fn test_empty_callstack() { + let trace = proto::Trace { + packet: vec![ + make_clock_snapshot_packet(), + make_interned_data_packet( + 1, + true, + proto::InternedData { + function_names: vec![make_interned_string(1, b"func")], + frames: vec![make_frame(1, 1)], + callstacks: vec![ + proto::Callstack { + iid: Some(1), + frame_ids: vec![], // empty callstack + }, + proto::Callstack { + iid: Some(2), + frame_ids: vec![1], // valid callstack + }, + ], + ..Default::default() + }, + ), + make_perf_sample_packet(1_000_000_000, 1, 1, 1), // empty callstack + make_perf_sample_packet(1_010_000_000, 1, 1, 2), // valid callstack + ], + }; + let bytes = trace.encode_to_vec(); + let (data, _images) = convert(&bytes).unwrap(); + + // Both samples are emitted, but the empty one produces a deduplicated empty stack. + assert_eq!(data.samples.len(), 2); + // The valid callstack should produce one frame. + assert_eq!(data.frames.len(), 1); + assert_eq!(data.frames[0].function.as_deref(), Some("func")); + } +} diff --git a/relay-profiling/src/perfetto/proto.rs b/relay-profiling/src/perfetto/proto.rs new file mode 100644 index 00000000000..1f3c651e1b0 --- /dev/null +++ b/relay-profiling/src/perfetto/proto.rs @@ -0,0 +1,184 @@ +// @generated +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Trace { + #[prost(message, repeated, tag = "1")] + pub packet: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracePacket { + #[prost(uint64, optional, tag = "8")] + pub timestamp: ::core::option::Option, + #[prost(message, optional, tag = "12")] + pub interned_data: ::core::option::Option, + #[prost(uint32, optional, tag = "13")] + pub sequence_flags: ::core::option::Option, + #[prost(oneof = "trace_packet::OptionalTrustedPacketSequenceId", tags = "10")] + pub optional_trusted_packet_sequence_id: + ::core::option::Option, + /// Only the oneof variants we care about; prost will skip the rest. + #[prost(oneof = "trace_packet::Data", tags = "2, 6, 54, 60, 66")] + pub data: ::core::option::Option, +} +/// Nested message and enum types in `TracePacket`. +pub mod trace_packet { + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum OptionalTrustedPacketSequenceId { + #[prost(uint32, tag = "10")] + TrustedPacketSequenceId(u32), + } + /// Only the oneof variants we care about; prost will skip the rest. + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Data { + #[prost(message, tag = "2")] + ProcessTree(super::ProcessTree), + #[prost(message, tag = "6")] + ClockSnapshot(super::ClockSnapshot), + #[prost(message, tag = "54")] + StreamingProfilePacket(super::StreamingProfilePacket), + #[prost(message, tag = "60")] + TrackDescriptor(super::TrackDescriptor), + #[prost(message, tag = "66")] + PerfSample(super::PerfSample), + } +} +// --- process tree ------------------------------------------------------------ + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProcessTree { + #[prost(message, repeated, tag = "2")] + pub threads: ::prost::alloc::vec::Vec, +} +/// Nested message and enum types in `ProcessTree`. +pub mod process_tree { + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] + pub struct Thread { + #[prost(int32, optional, tag = "1")] + pub tid: ::core::option::Option, + #[prost(string, optional, tag = "2")] + pub name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(int32, optional, tag = "3")] + pub tgid: ::core::option::Option, + } +} +// --- clock sync --------------------------------------------------------------- + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ClockSnapshot { + #[prost(message, repeated, tag = "1")] + pub clocks: ::prost::alloc::vec::Vec, + #[prost(uint32, optional, tag = "2")] + pub primary_trace_clock: ::core::option::Option, +} +/// Nested message and enum types in `ClockSnapshot`. +pub mod clock_snapshot { + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] + pub struct Clock { + #[prost(uint32, optional, tag = "1")] + pub clock_id: ::core::option::Option, + #[prost(uint64, optional, tag = "2")] + pub timestamp: ::core::option::Option, + } +} +// --- interned data ----------------------------------------------------------- + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InternedData { + #[prost(message, repeated, tag = "5")] + pub function_names: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "6")] + pub frames: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "7")] + pub callstacks: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "16")] + pub build_ids: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "17")] + pub mapping_paths: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "19")] + pub mappings: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct InternedString { + #[prost(uint64, optional, tag = "1")] + pub iid: ::core::option::Option, + #[prost(bytes = "vec", optional, tag = "2")] + pub str: ::core::option::Option<::prost::alloc::vec::Vec>, +} +// --- profiling common -------------------------------------------------------- + +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Frame { + #[prost(uint64, optional, tag = "1")] + pub iid: ::core::option::Option, + #[prost(uint64, optional, tag = "2")] + pub function_name_id: ::core::option::Option, + #[prost(uint64, optional, tag = "3")] + pub mapping_id: ::core::option::Option, + #[prost(uint64, optional, tag = "4")] + pub rel_pc: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Mapping { + #[prost(uint64, optional, tag = "1")] + pub iid: ::core::option::Option, + #[prost(uint64, optional, tag = "2")] + pub build_id: ::core::option::Option, + #[prost(uint64, optional, tag = "3")] + pub start_offset: ::core::option::Option, + #[prost(uint64, optional, tag = "4")] + pub start: ::core::option::Option, + #[prost(uint64, optional, tag = "5")] + pub end: ::core::option::Option, + #[prost(uint64, optional, tag = "6")] + pub load_bias: ::core::option::Option, + #[prost(uint64, repeated, packed = "false", tag = "7")] + pub path_string_ids: ::prost::alloc::vec::Vec, + #[prost(uint64, optional, tag = "8")] + pub exact_offset: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Callstack { + #[prost(uint64, optional, tag = "1")] + pub iid: ::core::option::Option, + #[prost(uint64, repeated, packed = "false", tag = "2")] + pub frame_ids: ::prost::alloc::vec::Vec, +} +// --- profiling packets ------------------------------------------------------- + +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PerfSample { + #[prost(uint32, optional, tag = "1")] + pub cpu: ::core::option::Option, + #[prost(uint32, optional, tag = "2")] + pub pid: ::core::option::Option, + #[prost(uint32, optional, tag = "3")] + pub tid: ::core::option::Option, + #[prost(uint64, optional, tag = "4")] + pub callstack_iid: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct StreamingProfilePacket { + #[prost(uint64, repeated, packed = "false", tag = "1")] + pub callstack_iid: ::prost::alloc::vec::Vec, + #[prost(int64, repeated, packed = "false", tag = "2")] + pub timestamp_delta_us: ::prost::alloc::vec::Vec, +} +// --- track descriptors ------------------------------------------------------- + +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TrackDescriptor { + #[prost(uint64, optional, tag = "1")] + pub uuid: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub thread: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ThreadDescriptor { + #[prost(int32, optional, tag = "1")] + pub pid: ::core::option::Option, + #[prost(int32, optional, tag = "2")] + pub tid: ::core::option::Option, + #[prost(string, optional, tag = "5")] + pub thread_name: ::core::option::Option<::prost::alloc::string::String>, +} +// @@protoc_insertion_point(module) diff --git a/relay-profiling/src/sample/mod.rs b/relay-profiling/src/sample/mod.rs index d3d1b0ba335..dd36828aebe 100644 --- a/relay-profiling/src/sample/mod.rs +++ b/relay-profiling/src/sample/mod.rs @@ -70,6 +70,13 @@ pub struct Frame { #[serde(skip_serializing_if = "Option::is_none")] pub module: Option, + /// The 'package' the frame was contained in. + /// + /// For native frames this is the dynamic library path (e.g. `libc.so`). + /// For Java frames this is the container (e.g. `boot-framework.oat`). + #[serde(skip_serializing_if = "Option::is_none")] + pub package: Option, + /// Which platform this frame is from. /// /// This can override the platform for a single frame. Otherwise, the platform of the event is diff --git a/relay-profiling/src/sample/v2.rs b/relay-profiling/src/sample/v2.rs index a5b0d2119b4..efd4c9a5734 100644 --- a/relay-profiling/src/sample/v2.rs +++ b/relay-profiling/src/sample/v2.rs @@ -36,6 +36,8 @@ pub struct ProfileMetadata { #[serde(skip_serializing_if = "Option::is_none")] pub environment: Option, pub platform: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub content_type: Option, pub release: Option, pub client_sdk: ClientSdk, diff --git a/relay-profiling/tests/fixtures/android/perfetto/android.pftrace b/relay-profiling/tests/fixtures/android/perfetto/android.pftrace new file mode 100644 index 00000000000..e04c92c1728 Binary files /dev/null and b/relay-profiling/tests/fixtures/android/perfetto/android.pftrace differ diff --git a/relay-server/src/envelope/content_type.rs b/relay-server/src/envelope/content_type.rs index fe8b09859ca..5f42170d0e0 100644 --- a/relay-server/src/envelope/content_type.rs +++ b/relay-server/src/envelope/content_type.rs @@ -191,10 +191,12 @@ relay_common::impl_str_de!(ContentType, "a content type string"); #[cfg(test)] mod tests { + use similar_asserts::assert_eq; + use super::*; #[test] - fn attachment_ref_roundtrip() { + fn test_attachment_ref_roundtrip() { let canonical_name = "application/vnd.sentry.attachment-ref+json"; let ct = ContentType::from_str(canonical_name).unwrap(); assert_eq!(ct, ContentType::AttachmentRef); diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index aa9ce5aa20d..92ee1a3b2d6 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -136,10 +136,14 @@ impl Forward for ProfileChunkOutput { let retention_days = ctx.event_retention().standard; for item in profile_chunks.split(|pc| pc.profile_chunks) { + let (kafka_payload, raw_profile, raw_profile_content_type) = split_item_payload(&item); + s.send_to_store(item.map(|item, _| StoreProfileChunk { retention_days, - payload: item.payload(), + payload: kafka_payload, quantities: item.quantities(), + raw_profile, + raw_profile_content_type, })); } @@ -147,6 +151,47 @@ impl Forward for ProfileChunkOutput { } } +/// Splits a profile chunk item payload into its constituent parts. +/// +/// For compound items (those with a `meta_length` header), the payload is +/// `[expanded JSON][raw binary]`. Returns `(kafka_payload, raw_profile, content_type)`. +/// +/// For plain items, returns `(full_payload, None, None)`. +#[cfg_attr(not(feature = "processing"), allow(dead_code))] +fn split_item_payload(item: &Item) -> (bytes::Bytes, Option, Option) { + let payload = item.payload(); + + let Some(meta_length) = item.meta_length() else { + return (payload, None, None); + }; + + let meta_length = meta_length as usize; + let Some((meta, body)) = payload.split_at_checked(meta_length) else { + return (payload, None, None); + }; + + if body.is_empty() { + return (payload.slice_ref(meta), None, None); + } + + // Extract content_type from the expanded JSON metadata using a minimal + // deserializer that only reads this single field, skipping the bulk of the + // payload (frames, stacks, samples, etc.). + #[derive(serde::Deserialize)] + struct ContentTypeProbe { + content_type: Option, + } + let content_type = serde_json::from_slice::(meta) + .ok() + .and_then(|v| v.content_type); + + ( + payload.slice_ref(meta), + Some(payload.slice_ref(body)), + content_type, + ) +} + /// Serialized profile chunks extracted from an envelope. #[derive(Debug)] pub struct SerializedProfileChunks { @@ -184,3 +229,116 @@ impl Counted for SerializedProfileChunks { impl CountRateLimited for Managed { type Error = Error; } + +#[cfg(test)] +mod tests { + use similar_asserts::assert_eq; + + use crate::envelope::ContentType; + + use super::*; + + fn make_chunk_item(meta: &[u8]) -> Item { + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::Json, bytes::Bytes::copy_from_slice(meta)); + item + } + + fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { + let meta_length = meta.len(); + let mut payload = bytes::BytesMut::with_capacity(meta_length + body.len()); + payload.extend_from_slice(meta); + payload.extend_from_slice(body); + + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_meta_length(meta_length as u32); + item + } + + #[test] + fn test_split_plain_chunk() { + let item = make_chunk_item(b"{}"); + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), b"{}"); + assert!(raw.is_none()); + assert!(ct.is_none()); + } + + #[test] + fn test_split_compound_chunk() { + let meta = br#"{"content_type":"perfetto"}"#; + let body = b"binary-data"; + let item = make_compound_item(meta, body); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), meta.as_ref()); + assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + assert_eq!(ct.as_deref(), Some("perfetto")); + } + + #[test] + fn test_split_compound_no_content_type() { + let meta = b"{}"; + let body = b"binary-data"; + let item = make_compound_item(meta, body); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), b"{}"); + assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + assert!(ct.is_none()); + } + + #[test] + fn test_split_compound_empty_body() { + let meta = br#"{"content_type":"perfetto"}"#; + let item = make_compound_item(meta, b""); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), meta.as_ref()); + assert!(raw.is_none()); + assert!(ct.is_none()); + } + + #[test] + fn test_split_compound_meta_length_exceeds_payload() { + // meta_length is set to more bytes than the payload actually contains. + // split_at_checked returns None, so we fall back to the full payload with no split. + let body = b"binary-data"; + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); + item.set_meta_length(body.len() as u32 + 100); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), body.as_ref()); + assert!(raw.is_none()); + assert!(ct.is_none()); + } + + #[test] + fn test_split_compound_invalid_json_meta() { + // meta portion is not valid JSON; content_type should be None. + let meta = b"not valid json {{{{"; + let body = b"binary-data"; + let item = make_compound_item(meta, body); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), meta.as_ref()); + assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + assert!(ct.is_none()); + } + + #[test] + fn test_split_compound_zero_meta_length() { + // meta_length = 0: meta slice is empty, entire payload is treated as body. + let body = b"binary-data"; + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); + item.set_meta_length(0); + + let (payload, raw, ct) = split_item_payload(&item); + assert_eq!(payload.as_ref(), b""); + assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + assert!(ct.is_none()); + } +} diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index 293f77aa707..a99896b1074 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -1,10 +1,13 @@ +use std::net::IpAddr; + +use relay_dynamic_config::Feature; use relay_profiling::ProfileType; use relay_quotas::DataCategory; use crate::envelope::{ContentType, Item, ItemType}; use crate::processing::Context; use crate::processing::Managed; -use crate::processing::profile_chunks::{Result, SerializedProfileChunks}; +use crate::processing::profile_chunks::{Error, Result, SerializedProfileChunks}; use crate::statsd::RelayCounters; use crate::utils; @@ -22,6 +25,18 @@ pub fn process(profile_chunks: &mut Managed, ctx: Conte profile_chunks.retain( |pc| &mut pc.profile_chunks, |item, records| -> Result<()> { + if let Some(meta_length) = item.meta_length() { + return process_compound_item( + item, + meta_length, + sdk, + client_ip, + filter_settings, + ctx, + records, + ); + } + let pc = relay_profiling::ProfileChunk::new(item.payload())?; // Validate the item inferred profile type with the one from the payload, @@ -65,13 +80,284 @@ pub fn process(profile_chunks: &mut Managed, ctx: Conte } *item = { - let mut item = Item::new(ItemType::ProfileChunk); - item.set_platform(pc.platform().to_owned()); - item.set_payload(ContentType::Json, expanded); - item + let mut new_item = Item::new(ItemType::ProfileChunk); + new_item.set_platform(pc.platform().to_owned()); + new_item.set_payload(ContentType::Json, expanded); + new_item }; Ok(()) }, ); } + +/// Processes a compound profile chunk item (JSON metadata + binary blob). +/// +/// The item payload is `[JSON metadata bytes][binary blob bytes]`, split at `meta_length`. +/// After expansion, the item is rebuilt with `[expanded JSON][raw binary]` and an updated +/// `meta_length`, so that `forward_store` can still extract the raw profile. +fn process_compound_item( + item: &mut Item, + meta_length: u32, + sdk: &str, + client_ip: Option, + filter_settings: &relay_filter::ProjectFiltersConfig, + ctx: Context<'_>, + records: &mut crate::managed::RecordKeeper, +) -> Result<()> { + let payload = item.payload(); + let meta_length = meta_length as usize; + + let Some((meta_json, raw_profile)) = payload.split_at_checked(meta_length) else { + return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); + }; + + #[derive(serde::Deserialize)] + struct ContentTypeProbe { + content_type: Option, + } + match serde_json::from_slice::(meta_json) + .ok() + .and_then(|v| v.content_type) + .as_deref() + { + Some("perfetto") => {} + _ => return Err(relay_profiling::ProfileError::PlatformNotSupported.into()), + } + + if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { + return Err(Error::FilterFeatureFlag); + } + + let expanded = relay_profiling::expand_perfetto(raw_profile, meta_json)?; + + if expanded.payload.len() > ctx.config.max_profile_size() { + return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + } + + if item + .profile_type() + .is_some_and(|pt| pt != expanded.profile_type()) + { + return Err(relay_profiling::ProfileError::InvalidProfileType.into()); + } + + if item.profile_type().is_none() { + relay_statsd::metric!( + counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, + sdk = sdk + ); + match expanded.profile_type() { + ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), + ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), + } + } + + expanded.filter(client_ip, filter_settings, ctx.global_config)?; + + // Rebuild the compound payload: [expanded JSON][raw binary]. + // This preserves the raw profile for downstream extraction in forward_store. + let platform = expanded.platform; + let expanded_payload = bytes::Bytes::from(expanded.payload); + let mut compound = bytes::BytesMut::with_capacity(expanded_payload.len() + raw_profile.len()); + compound.extend_from_slice(&expanded_payload); + compound.extend_from_slice(raw_profile); + + *item = { + let mut new_item = Item::new(ItemType::ProfileChunk); + new_item.set_platform(platform); + new_item.set_payload(ContentType::Json, compound.freeze()); + new_item.set_meta_length(expanded_payload.len() as u32); + new_item + }; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use similar_asserts::assert_eq; + + use relay_dynamic_config::{Feature, FeatureSet, ProjectConfig}; + + use super::*; + use crate::Envelope; + use crate::envelope::ContentType; + use crate::extractors::RequestMeta; + use crate::managed::Managed; + use crate::processing::Context; + use crate::processing::profile_chunks::SerializedProfileChunks; + use crate::services::projects::project::ProjectInfo; + + const PERFETTO_FIXTURE: &[u8] = include_bytes!( + "../../../../relay-profiling/tests/fixtures/android/perfetto/android.pftrace" + ); + + fn perfetto_meta() -> Vec { + serde_json::json!({ + "version": "2", + "chunk_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "profiler_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "platform": "android", + "content_type": "perfetto", + "client_sdk": {"name": "sentry-android", "version": "1.0"}, + }) + .to_string() + .into_bytes() + } + + fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { + let meta_length = meta.len() as u32; + let mut payload = bytes::BytesMut::new(); + payload.extend_from_slice(meta); + payload.extend_from_slice(body); + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_meta_length(meta_length); + item + } + + fn make_chunks( + items: Vec, + ) -> ( + Managed, + crate::managed::ManagedTestHandle, + ) { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + let envelope = Envelope::from_request(None, RequestMeta::new(dsn)); + let headers = envelope.headers().clone(); + Managed::for_test(SerializedProfileChunks { + headers, + profile_chunks: items, + }) + .build() + } + + /// Runs `process_compound_item` for the single item in `managed` and returns the + /// inner [`SerializedProfileChunks`] after processing, consuming the managed value. + fn run(managed: &mut Managed, ctx: Context<'_>) { + let sdk = ""; + let client_ip = None; + let filter_settings = Default::default(); + managed.retain( + |pc| &mut pc.profile_chunks, + |item, records| -> Result<()> { + let meta_length = item.meta_length().unwrap_or(0); + process_compound_item( + item, + meta_length, + sdk, + client_ip, + &filter_settings, + ctx, + records, + ) + }, + ); + } + + #[test] + fn test_process_compound_unknown_content_type() { + // content_type is not "perfetto" → item is dropped immediately. + let meta = serde_json::json!({ + "version": "2", + "chunk_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "profiler_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + "platform": "android", + "content_type": "unknown", + "client_sdk": {"name": "sentry-android", "version": "1.0"}, + }) + .to_string() + .into_bytes(); + let item = make_compound_item(&meta, PERFETTO_FIXTURE); + let (mut managed, _handle) = make_chunks(vec![item]); + + run(&mut managed, Context::for_test()); + + let chunks = managed.accept(|c| c); + assert!(chunks.profile_chunks.is_empty(), "item should be dropped"); + } + + #[test] + fn test_process_compound_feature_flag_disabled() { + // The ContinuousProfilingPerfetto feature is absent → item is dropped. + // Default Context::for_test() uses relay mode = Managed with an empty feature set. + let meta = perfetto_meta(); + let item = make_compound_item(&meta, PERFETTO_FIXTURE); + let (mut managed, _handle) = make_chunks(vec![item]); + + run(&mut managed, Context::for_test()); + + let chunks = managed.accept(|c| c); + assert!( + chunks.profile_chunks.is_empty(), + "item should be dropped when feature flag is absent" + ); + } + + #[test] + fn test_process_compound_meta_length_out_of_bounds() { + // meta_length header is larger than the actual payload → InvalidSampledProfile. + let body = b"some bytes"; + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); + item.set_meta_length(body.len() as u32 + 100); + let (mut managed, _handle) = make_chunks(vec![item]); + + run(&mut managed, Context::for_test()); + + let chunks = managed.accept(|c| c); + assert!( + chunks.profile_chunks.is_empty(), + "item should be dropped on out-of-bounds meta_length" + ); + } + + #[test] + fn test_process_compound_success() { + // Happy path: valid Perfetto trace + feature enabled → compound payload rebuilt. + let meta = perfetto_meta(); + let item = make_compound_item(&meta, PERFETTO_FIXTURE); + let (mut managed, _handle) = make_chunks(vec![item]); + + let ctx = Context { + project_info: &ProjectInfo { + config: ProjectConfig { + features: FeatureSet::from_iter([ + Feature::ContinuousProfiling, + Feature::ContinuousProfilingPerfetto, + ]), + ..Default::default() + }, + ..Default::default() + }, + ..Context::for_test() + }; + + run(&mut managed, ctx); + + let mut chunks = managed.accept(|c| c); + assert_eq!(chunks.profile_chunks.len(), 1, "item should be retained"); + + let item = chunks.profile_chunks.remove(0); + + // The rebuilt item must carry a meta_length pointing to the expanded JSON. + let meta_length = item + .meta_length() + .expect("rebuilt item must have meta_length"); + assert!(meta_length > 0); + + // The first meta_length bytes must be valid JSON (the expanded Sample v2 profile). + let payload = item.payload(); + let (json_part, raw_part) = payload.split_at(meta_length as usize); + assert!( + serde_json::from_slice::(json_part).is_ok(), + "first meta_length bytes must be valid JSON" + ); + + // The raw binary is the original Perfetto trace preserved verbatim. + assert_eq!(raw_part, PERFETTO_FIXTURE); + } +} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 606f2695cbf..b543e40241b 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -158,6 +158,13 @@ pub struct StoreProfileChunk { /// /// Quantities are different for backend and ui profile chunks. pub quantities: Quantities, + /// Raw binary profile blob (e.g. Perfetto trace). + /// + /// Sent alongside the expanded JSON payload because the expansion only extracts a + /// minimum of information; the raw profile is preserved for further processing downstream. + pub raw_profile: Option, + /// Content type of `raw_profile` (e.g. `"perfetto"`). + pub raw_profile_content_type: Option, } impl Counted for StoreProfileChunk { @@ -815,6 +822,8 @@ impl StoreService { scoping.project_id.to_string(), )]), payload: message.payload, + raw_profile: message.raw_profile, + raw_profile_content_type: message.raw_profile_content_type, }; self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) @@ -1657,6 +1666,10 @@ struct ProfileChunkKafkaMessage { #[serde(skip)] headers: BTreeMap, payload: Bytes, + #[serde(skip_serializing_if = "Option::is_none")] + raw_profile: Option, + #[serde(skip_serializing_if = "Option::is_none")] + raw_profile_content_type: Option, } /// An enum over all possible ingest messages.