From 4f6d153bd03c65253d9cc30c86677199ac7cf009 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 27 May 2026 16:28:45 -0400 Subject: [PATCH] benchmakrs arrow-flight roundtrip as well as encode/decode paths indivdually --- arrow-flight/Cargo.toml | 20 ++++ arrow-flight/benchmarks/common/mod.rs | 146 +++++++++++++++++++++++ arrow-flight/benchmarks/flight_decode.rs | 66 ++++++++++ arrow-flight/benchmarks/flight_encode.rs | 63 ++++++++++ arrow-flight/benchmarks/roundtrip.rs | 62 ++++++++++ 5 files changed, 357 insertions(+) create mode 100644 arrow-flight/benchmarks/common/mod.rs create mode 100644 arrow-flight/benchmarks/flight_decode.rs create mode 100644 arrow-flight/benchmarks/flight_encode.rs create mode 100644 arrow-flight/benchmarks/roundtrip.rs diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 8f95e1995a67..d42b05fdd6ab 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -76,6 +76,7 @@ cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint", "tonic/tls-webpki-root [dev-dependencies] arrow-cast = { workspace = true, features = ["prettyprint"] } assert_cmd = "2.0.8" +criterion = { workspace = true, default-features = false, features = ["async_tokio"] } http = "1.1.0" http-body = "1.0.0" hyper-util = "0.1" @@ -105,3 +106,22 @@ required-features = ["flight-sql", "tls-ring"] name = "flight_sql_client_cli" path = "tests/flight_sql_client_cli.rs" required-features = ["cli", "flight-sql", "tls-ring"] + +[profile.bench] +debug = true + +[[bench]] +name = "flight_encode" +path = "benchmarks/flight_encode.rs" +harness = false + +[[bench]] +name = "flight_decode" +path = "benchmarks/flight_decode.rs" +harness = false + +[[bench]] +name = "roundtrip" +path = "benchmarks/roundtrip.rs" +harness = false + diff --git a/arrow-flight/benchmarks/common/mod.rs b/arrow-flight/benchmarks/common/mod.rs new file mode 100644 index 000000000000..3ff8cfa5936b --- /dev/null +++ b/arrow-flight/benchmarks/common/mod.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, RwLock}; + +use arrow_array::{ + Array, ArrayRef, DictionaryArray, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, + types::Int32Type, +}; +use arrow_buffer::OffsetBuffer; +use arrow_flight::{ + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket, + flight_service_server::{FlightService, FlightServiceServer}, +}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt, stream::BoxStream}; +use tonic::{ + Request, Response, Status, Streaming, + transport::{Channel, Server}, +}; + +pub type Builder = fn(usize) -> ArrayRef; + +pub const TYPES: &[(&str, Builder)] = &[ + ("fixed", fixed), + ("nested", nested), + ("variable", variable), + ("dict", dict), +]; + +fn fixed(n: usize) -> ArrayRef { + Arc::new(Int64Array::from_iter_values(0..n as i64)) +} + +fn variable(n: usize) -> ArrayRef { + Arc::new(StringArray::from_iter_values( + (0..n).map(|i| format!("variable_string_{i}{}", "_".repeat(i % 16))), + )) +} + +fn nested(n: usize) -> ArrayRef { + let values = Int32Array::from_iter_values(0..(n * 4) as i32); + let offsets = OffsetBuffer::::from_lengths(std::iter::repeat_n(4usize, n)); + let field = Arc::new(Field::new_list_field(DataType::Int32, false)); + Arc::new(ListArray::new(field, offsets, Arc::new(values), None)) +} + +fn dict(n: usize) -> ArrayRef { + let keys = Int32Array::from_iter_values((0..n).map(|i| (i % 32) as i32)); + let values = StringArray::from_iter_values((0..32).map(|i| format!("dictionary_value_{i:03}"))); + Arc::new(DictionaryArray::::try_new(keys, Arc::new(values)).unwrap()) +} + +pub fn build_batch(name: &str, rows: usize, cols: usize, build: Builder) -> RecordBatch { + let arrays: Vec = (0..cols).map(|_| build(rows)).collect(); + let fields: Vec = arrays + .iter() + .enumerate() + .map(|(i, a)| Field::new(format!("column_{i}_{name}"), a.data_type().clone(), false)) + .collect(); + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).unwrap() +} + +#[derive(Clone, Default)] +pub struct BenchServer { + frames: Arc>>, +} + +impl BenchServer { + #[allow(dead_code)] + pub fn set_frames(&self, frames: Vec) { + *self.frames.write().unwrap() = frames; + } +} + +fn unimpl() -> Result { + Err(Status::unimplemented("")) +} + +#[rustfmt::skip] +#[tonic::async_trait] +impl FlightService for BenchServer { + type HandshakeStream = BoxStream<'static, Result>; + type ListFlightsStream = BoxStream<'static, Result>; + type DoGetStream = BoxStream<'static, Result>; + type DoPutStream = BoxStream<'static, Result>; + type DoActionStream = BoxStream<'static, Result>; + type ListActionsStream = BoxStream<'static, Result>; + type DoExchangeStream = BoxStream<'static, Result>; + + async fn do_get(&self, _: Request) -> Result, Status> { + let frames = self.frames.read().unwrap().clone(); + Ok(Response::new(futures::stream::iter(frames.into_iter().map(Ok)).boxed())) + } + + async fn do_put(&self, req: Request>) -> Result, Status> { + let _: Vec = req.into_inner().try_collect().await?; + let ack = PutResult { app_metadata: Bytes::new() }; + Ok(Response::new(futures::stream::iter([Ok(ack)]).boxed())) + } + + async fn do_exchange(&self, req: Request>) -> Result, Status> { + Ok(Response::new(req.into_inner().boxed())) + } + + async fn handshake(&self, _: Request>) -> Result, Status> { unimpl() } + async fn list_flights(&self, _: Request) -> Result, Status> { unimpl() } + async fn get_flight_info(&self, _: Request) -> Result, Status> { unimpl() } + async fn poll_flight_info(&self, _: Request) -> Result, Status> { unimpl() } + async fn get_schema(&self, _: Request) -> Result, Status> { unimpl() } + async fn do_action(&self, _: Request) -> Result, Status> { unimpl() } + async fn list_actions(&self, _: Request) -> Result, Status> { unimpl() } +} + +pub async fn start_server() -> (Channel, BenchServer) { + let server = BenchServer::default(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn( + Server::builder() + .add_service(FlightServiceServer::new(server.clone())) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + let channel = Channel::from_shared(format!("http://{addr}")) + .unwrap() + .connect() + .await + .unwrap(); + (channel, server) +} diff --git a/arrow-flight/benchmarks/flight_decode.rs b/arrow-flight/benchmarks/flight_decode.rs new file mode 100644 index 000000000000..d5459d8edcc4 --- /dev/null +++ b/arrow-flight/benchmarks/flight_decode.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_flight::{FlightClient, FlightData, Ticket, encode::FlightDataEncoderBuilder}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use futures::TryStreamExt; +use tonic::transport::Channel; + +mod common; +use common::{TYPES, build_batch, start_server}; + +const ROWS: [usize; 2] = [8 * 1024, 64 * 1024]; +const COLS: [usize; 2] = [1, 8]; + +async fn encode_to_frames(batch: RecordBatch) -> Vec { + FlightDataEncoderBuilder::new() + .build(futures::stream::iter([Ok(batch)])) + .try_collect() + .await + .unwrap() +} + +async fn recv(channel: Channel) { + let mut client = FlightClient::new(channel); + let resp = client.do_get(Ticket::new("bench")).await.unwrap(); + let _: Vec = resp.try_collect().await.unwrap(); +} + +fn bench(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let (channel, server) = rt.block_on(start_server()); + let mut g = c.benchmark_group("flight_decode"); + + for &(name, build) in TYPES { + for &rows in &ROWS { + for &cols in &COLS { + let batch = build_batch(name, rows, cols, build); + let bytes = batch.get_array_memory_size() as u64; + server.set_frames(rt.block_on(encode_to_frames(batch))); + let id = BenchmarkId::new(name, format!("{rows}x{cols}")); + g.throughput(Throughput::Bytes(bytes)); + g.bench_function(id, |b| { + b.to_async(&rt).iter(|| recv(channel.clone())); + }); + } + } + } +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/arrow-flight/benchmarks/flight_encode.rs b/arrow-flight/benchmarks/flight_encode.rs new file mode 100644 index 000000000000..9ac15cd22b5b --- /dev/null +++ b/arrow-flight/benchmarks/flight_encode.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_flight::{FlightClient, encode::FlightDataEncoderBuilder}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use futures::TryStreamExt; +use tonic::transport::Channel; + +mod common; +use common::{TYPES, build_batch, start_server}; + +const ROWS: [usize; 2] = [8 * 1024, 64 * 1024]; +const COLS: [usize; 2] = [1, 8]; + +async fn send(channel: Channel, batch: RecordBatch) { + let mut client = FlightClient::new(channel); + let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)])); + let _: Vec<_> = client + .do_put(frames) + .await + .unwrap() + .try_collect() + .await + .unwrap(); +} + +fn bench(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let (channel, _) = rt.block_on(start_server()); + let mut g = c.benchmark_group("flight_encode"); + + for &(name, build) in TYPES { + for &rows in &ROWS { + for &cols in &COLS { + let batch = build_batch(name, rows, cols, build); + let id = BenchmarkId::new(name, format!("{rows}x{cols}")); + g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); + g.bench_with_input(id, &batch, |b, batch| { + b.to_async(&rt) + .iter(|| send(channel.clone(), batch.clone())); + }); + } + } + } +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/arrow-flight/benchmarks/roundtrip.rs b/arrow-flight/benchmarks/roundtrip.rs new file mode 100644 index 000000000000..a6c0556226ad --- /dev/null +++ b/arrow-flight/benchmarks/roundtrip.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::RecordBatch; +use arrow_flight::{FlightClient, encode::FlightDataEncoderBuilder}; +use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main}; +use futures::TryStreamExt; +use tonic::transport::Channel; + +mod common; +use common::{TYPES, build_batch, start_server}; + +const ROWS: [usize; 2] = [8 * 1024, 64 * 1024]; +const COLS: [usize; 2] = [8, 16]; + +async fn roundtrip(channel: Channel, batch: RecordBatch) { + let mut client = FlightClient::new(channel); + let frames = FlightDataEncoderBuilder::new().build(futures::stream::iter([Ok(batch)])); + let resp = client.do_exchange(frames).await.unwrap(); + let _: Vec = resp.try_collect().await.unwrap(); +} + +fn bench(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let (channel, _) = rt.block_on(start_server()); + let mut g = c.benchmark_group("flight_roundtrip"); + + for &(name, build) in TYPES { + for &rows in &ROWS { + for &cols in &COLS { + //let batch = build_batch(name, rows, cols, build); + let id = BenchmarkId::new(name, format!("{rows}x{cols}")); + //g.throughput(Throughput::Bytes(batch.get_array_memory_size() as u64)); + g.bench_function(id, |b| { + let batch = build_batch(name, rows, cols, build); + b.to_async(&rt).iter_batched( + || (channel.clone(), batch.clone()), + |(ch, b)| roundtrip(ch, b), + BatchSize::SmallInput, + ); + }); + } + } + } +} + +criterion_group!(benches, bench); +criterion_main!(benches);