From 301e04141470500f785cda026e6a62c19b4d5c8a Mon Sep 17 00:00:00 2001 From: Jeff Jiang Date: Tue, 7 Apr 2026 06:24:38 -0700 Subject: [PATCH 1/5] is_retryable --- tonic-xds/src/client/mod.rs | 1 + tonic-xds/src/client/retry.rs | 181 ++++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 tonic-xds/src/client/retry.rs diff --git a/tonic-xds/src/client/mod.rs b/tonic-xds/src/client/mod.rs index e532dbb83..3ac8fef2a 100644 --- a/tonic-xds/src/client/mod.rs +++ b/tonic-xds/src/client/mod.rs @@ -2,4 +2,5 @@ pub(crate) mod channel; pub(crate) mod cluster; pub(crate) mod endpoint; pub(crate) mod lb; +pub(crate) mod retry; pub(crate) mod route; diff --git a/tonic-xds/src/client/retry.rs b/tonic-xds/src/client/retry.rs new file mode 100644 index 000000000..3c11c2228 --- /dev/null +++ b/tonic-xds/src/client/retry.rs @@ -0,0 +1,181 @@ +//! gRPC retry utilities. + +use std::io; + +/// Check if an error's source chain contains a retryable connection-level error. +/// +/// These are errors where the request was definitely **not** sent, making it safe to retry. +/// Walks the full error source chain via [`std::error::Error::source`]. +pub(crate) fn is_retryable_connection_error(err: &(dyn std::error::Error + 'static)) -> bool { + let mut current: Option<&(dyn std::error::Error + 'static)> = Some(err); + while let Some(e) = current { + if let Some(io_err) = e.downcast_ref::() { + match io_err.kind() { + io::ErrorKind::ConnectionRefused + | io::ErrorKind::NotConnected + | io::ErrorKind::AddrInUse + | io::ErrorKind::AddrNotAvailable => return true, + _ => {} + } + } + current = e.source(); + } + false +} + +/// Check if a gRPC status code is retryable according to the given policy. +pub(crate) fn is_retryable_grpc_status_code( + code: tonic::Code, + retryable_codes: &[tonic::Code], +) -> bool { + retryable_codes.contains(&code) +} + +/// Check if a request should be retried, either because of a retryable connection error +/// or because the gRPC response status code is in the retryable set. +pub(crate) fn is_retryable( + result: &Result<&http::Response<()>, &E>, + retryable_codes: &[tonic::Code], +) -> bool { + match result { + Err(err) => is_retryable_connection_error(*err), + Ok(response) => { + let status = tonic::Status::from_header_map(response.headers()); + match status { + Some(status) => is_retryable_grpc_status_code(status.code(), retryable_codes), + // No grpc-status header means success + None => false, + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + // --- is_retryable_connection_error tests --- + + #[test] + fn test_connection_refused_is_retryable() { + let err = io::Error::new(io::ErrorKind::ConnectionRefused, "refused"); + assert!(is_retryable_connection_error(&err)); + } + + #[test] + fn test_not_connected_is_retryable() { + let err = io::Error::new(io::ErrorKind::NotConnected, "not connected"); + assert!(is_retryable_connection_error(&err)); + } + + #[test] + fn test_addr_in_use_is_retryable() { + let err = io::Error::new(io::ErrorKind::AddrInUse, "addr in use"); + assert!(is_retryable_connection_error(&err)); + } + + #[test] + fn test_addr_not_available_is_retryable() { + let err = io::Error::new(io::ErrorKind::AddrNotAvailable, "addr not available"); + assert!(is_retryable_connection_error(&err)); + } + + #[test] + fn test_connection_reset_is_not_retryable() { + // Connection reset means the request may have been sent + let err = io::Error::new(io::ErrorKind::ConnectionReset, "reset"); + assert!(!is_retryable_connection_error(&err)); + } + + #[test] + fn test_timeout_is_not_retryable() { + let err = io::Error::new(io::ErrorKind::TimedOut, "timed out"); + assert!(!is_retryable_connection_error(&err)); + } + + #[test] + fn test_nested_connection_refused_is_retryable() { + // tonic::Status wraps the inner error and exposes it via source() + let inner = io::Error::new(io::ErrorKind::ConnectionRefused, "refused"); + let mut status = tonic::Status::unavailable("connection refused"); + status.set_source(Arc::new(inner)); + assert!(is_retryable_connection_error(&status)); + } + + #[test] + fn test_non_io_error_is_not_retryable() { + #[derive(Debug)] + struct CustomError; + impl std::fmt::Display for CustomError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "custom") + } + } + impl std::error::Error for CustomError {} + + assert!(!is_retryable_connection_error(&CustomError)); + } + + // --- is_retryable_grpc_status_code tests --- + + #[test] + fn test_unavailable_is_retryable() { + let codes = vec![tonic::Code::Unavailable, tonic::Code::Cancelled]; + assert!(is_retryable_grpc_status_code( + tonic::Code::Unavailable, + &codes + )); + } + + #[test] + fn test_ok_is_not_retryable() { + let codes = vec![tonic::Code::Unavailable, tonic::Code::Cancelled]; + assert!(!is_retryable_grpc_status_code(tonic::Code::Ok, &codes)); + } + + #[test] + fn test_empty_retryable_codes() { + assert!(!is_retryable_grpc_status_code( + tonic::Code::Unavailable, + &[] + )); + } + + // --- is_retryable tests --- + + #[test] + fn test_is_retryable_connection_error_via_result() { + let err = io::Error::new(io::ErrorKind::ConnectionRefused, "refused"); + let result: Result<&http::Response<()>, &io::Error> = Err(&err); + assert!(is_retryable(&result, &[])); + } + + #[test] + fn test_is_retryable_grpc_status_via_result() { + let response = http::Response::builder() + .header("grpc-status", "14") // UNAVAILABLE + .body(()) + .unwrap(); + let result: Result<&http::Response<()>, &io::Error> = Ok(&response); + assert!(is_retryable(&result, &[tonic::Code::Unavailable])); + } + + #[test] + fn test_is_not_retryable_ok_response() { + let response = http::Response::builder() + .header("grpc-status", "0") // OK + .body(()) + .unwrap(); + let result: Result<&http::Response<()>, &io::Error> = Ok(&response); + assert!(!is_retryable(&result, &[tonic::Code::Unavailable])); + } + + #[test] + fn test_is_not_retryable_no_grpc_status_header() { + let response = http::Response::builder().body(()).unwrap(); + let result: Result<&http::Response<()>, &io::Error> = Ok(&response); + assert!(!is_retryable(&result, &[tonic::Code::Unavailable])); + } +} From 54bacb1f390536313f2ad52fa6773f63ae9ceae5 Mon Sep 17 00:00:00 2001 From: Jeff Jiang Date: Tue, 7 Apr 2026 09:20:55 -0700 Subject: [PATCH 2/5] layer --- tonic-xds/Cargo.toml | 5 +- tonic-xds/src/client/channel.rs | 60 ++++- tonic-xds/src/client/retry.rs | 460 ++++++++++++++++++++++++++++++-- tonic-xds/src/testutil/grpc.rs | 65 +++++ 4 files changed, 566 insertions(+), 24 deletions(-) diff --git a/tonic-xds/Cargo.toml b/tonic-xds/Cargo.toml index c9607595a..89a974e55 100644 --- a/tonic-xds/Cargo.toml +++ b/tonic-xds/Cargo.toml @@ -23,7 +23,8 @@ exclude = ["proto/test/*"] [dependencies] tonic = "0.14" http = "1" -tower = { version = "0.5", default-features = false, features = ["discover"] } +http-body = "1" +tower = { version = "0.5", default-features = false, features = ["discover", "retry"] } arc-swap = "1" dashmap = "6.1" thiserror = "2.0.17" @@ -41,6 +42,8 @@ tokio = { version = "1", features = ["sync"] } # cryptographic security, only statistical uniformity for traffic distribution. fastrand = "2" tokio-stream = "0.1" +backoff = "0.4" +shared_http_body = "0.1" [lints] workspace = true diff --git a/tonic-xds/src/client/channel.rs b/tonic-xds/src/client/channel.rs index a4215ab57..3c8495ea3 100644 --- a/tonic-xds/src/client/channel.rs +++ b/tonic-xds/src/client/channel.rs @@ -14,6 +14,7 @@ use tower::{BoxError, Service, load::Load, util::BoxCloneService}; use { crate::client::cluster::ClusterClientRegistryGrpc, crate::client::lb::ClusterDiscovery, + crate::client::retry::{GrpcRetryPolicy, RetryLayer}, crate::client::route::{Router, XdsRoutingLayer}, tower::ServiceBuilder, }; @@ -164,23 +165,26 @@ impl XdsChannelBuilder { BoxCloneService::new(self.build_tonic_grpc_channel()) } - /// Builds an `XdsChannelGrpc` from the given router and cluster discovery. + /// Builds an `XdsChannelGrpc` from the given router, cluster discovery, and retry policy. #[cfg(test)] pub(crate) fn build_grpc_channel_from_parts( &self, router: Arc, discovery: Arc>>, + retry_policy: GrpcRetryPolicy, ) -> XdsChannelGrpc { let routing_layer = XdsRoutingLayer::new(router); + let retry_layer = RetryLayer::new(retry_policy); let cluster_registry = Arc::new(ClusterClientRegistryGrpc::new()); let lb_service = XdsLbService::new(cluster_registry, discovery); let service = ServiceBuilder::new() .layer(routing_layer) + .layer(retry_layer) + .map_request(|req: Request>| { + req.map(TonicBody::new) + }) .service(lb_service); - BoxCloneService::new(XdsChannelTonicGrpc { - config: self.config.clone(), - inner: service, - }) + BoxCloneService::new(service) } } @@ -192,6 +196,7 @@ mod tests { use crate::client::endpoint::EndpointAddress; use crate::client::endpoint::EndpointChannel; use crate::client::lb::{BoxDiscover, ClusterDiscovery}; + use crate::client::retry::GrpcRetryPolicy; use crate::client::route::RouteDecision; use crate::client::route::RouteInput; use crate::client::route::Router; @@ -333,8 +338,11 @@ mod tests { let xds_manager = Arc::new(MockXdsManager::from_test_servers(&servers)); let xds_channel_builder = XdsChannelBuilder::with_config(XdsChannelConfig::default()); - let xds_channel = xds_channel_builder - .build_grpc_channel_from_parts(xds_manager.clone(), xds_manager.clone()); + let xds_channel = xds_channel_builder.build_grpc_channel_from_parts( + xds_manager.clone(), + xds_manager.clone(), + GrpcRetryPolicy::default(), + ); let client = GreeterClient::new(xds_channel); @@ -387,4 +395,42 @@ mod tests { let _ = server.handle.await; } } + + #[tokio::test] + async fn test_retry_once_on_unavailable() { + use crate::client::retry::{GrpcRetryPolicy, GrpcRetryPolicyConfig}; + use crate::testutil::grpc::spawn_fail_first_n_server; + + // Server fails the first request with UNAVAILABLE, succeeds on retry. + let server = spawn_fail_first_n_server("retry-server", 1) + .await + .expect("Failed to spawn server"); + + let servers = vec![server]; + let xds_manager = Arc::new(MockXdsManager::from_test_servers(&servers)); + + let retry_policy = GrpcRetryPolicy::new( + GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]) + .num_retries(1), + ); + + let xds_channel = XdsChannelBuilder::with_config(XdsChannelConfig::default()) + .build_grpc_channel_from_parts( + xds_manager.clone(), + xds_manager.clone(), + retry_policy, + ); + + let mut client = GreeterClient::new(xds_channel); + + let response = client + .say_hello(HelloRequest { + name: "retry-test".to_string(), + }) + .await + .expect("request should succeed after retry"); + + assert_eq!(response.into_inner().message, "retry-server: retry-test"); + } } diff --git a/tonic-xds/src/client/retry.rs b/tonic-xds/src/client/retry.rs index 3c11c2228..2c130deac 100644 --- a/tonic-xds/src/client/retry.rs +++ b/tonic-xds/src/client/retry.rs @@ -1,6 +1,20 @@ //! gRPC retry utilities. +use std::fmt::Debug; use std::io; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use arc_swap::ArcSwap; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoffBuilder; +use http::{Request, Response}; +use http_body::Body; +use shared_http_body::{SharedBody, SharedBodyExt}; +use tower::retry::Retry; +use tower::retry::Policy; +use tower::{Layer, Service}; /// Check if an error's source chain contains a retryable connection-level error. /// @@ -33,16 +47,19 @@ pub(crate) fn is_retryable_grpc_status_code( /// Check if a request should be retried, either because of a retryable connection error /// or because the gRPC response status code is in the retryable set. -pub(crate) fn is_retryable( - result: &Result<&http::Response<()>, &E>, - retryable_codes: &[tonic::Code], +/// TODO: gRPC retriability is based on gRPC status code by default, in practice this may +/// cause non-idempotent methods to be retried. It might be better to allow customizing +/// retryability checks in the future. +pub(crate) fn is_retryable( + result: &Result, tower::BoxError>, + policy: &GrpcRetryPolicyConfig, ) -> bool { match result { - Err(err) => is_retryable_connection_error(*err), + Err(err) => is_retryable_connection_error(err.as_ref()), Ok(response) => { let status = tonic::Status::from_header_map(response.headers()); match status { - Some(status) => is_retryable_grpc_status_code(status.code(), retryable_codes), + Some(status) => is_retryable_grpc_status_code(status.code(), &policy.retry_on), // No grpc-status header means success None => false, } @@ -50,10 +67,296 @@ pub(crate) fn is_retryable( } } +/// Maximum number of retry attempts allowed by the gRPC retry spec. +/// Any `num_retries` value that would result in more than 5 total attempts +/// is capped to `MAX_ATTEMPTS - 1 = 4`. +const MAX_ATTEMPTS: u32 = 5; + +/// Minimum floor for backoff durations. Values below this are clamped up. +const MIN_BACKOFF: Duration = Duration::from_millis(1); + +/// Backoff configuration for gRPC retries. +/// +/// Build via [`GrpcRetryBackoffConfig::new`], which requires `base_interval`. +/// `max_interval` and `backoff_multiplier` are optional with sensible defaults. +/// +/// # Guardrails +/// - `base_interval` and `max_interval` must be > 0; values < 1ms are treated as 1ms. +/// - `max_interval` defaults to `10 * base_interval`. +/// - `max_interval` must be >= `base_interval`; if not, it is clamped to `base_interval`. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct GrpcRetryBackoffConfig { + pub(crate) base_interval: Duration, + pub(crate) max_interval: Duration, + pub(crate) backoff_multiplier: f64, +} + +impl GrpcRetryBackoffConfig { + /// Create a new backoff config with the given `base_interval`. + /// `max_interval` defaults to `10 * base_interval`. + /// `backoff_multiplier` defaults to `2.0`. + pub(crate) fn new(base_interval: Duration) -> Self { + let base_interval = base_interval.max(MIN_BACKOFF); + Self { + max_interval: base_interval * 10, + base_interval, + backoff_multiplier: 2.0, + } + } + + /// Set the maximum backoff interval. + /// Values < 1ms are treated as 1ms. Values < `base_interval` are clamped to `base_interval`. + pub(crate) fn max_interval(mut self, max_interval: Duration) -> Self { + let max_interval = max_interval.max(MIN_BACKOFF); + self.max_interval = max_interval.max(self.base_interval); + self + } + + /// Set the backoff multiplier (default: 2.0). + pub(crate) fn backoff_multiplier(mut self, multiplier: f64) -> Self { + self.backoff_multiplier = multiplier; + self + } +} + +impl Default for GrpcRetryBackoffConfig { + fn default() -> Self { + Self::new(Duration::from_millis(25)) + .max_interval(Duration::from_millis(250)) + } +} + +/// gRPC retry policy configuration. +/// +/// Built via [`GrpcRetryPolicyConfig::new`] with defaults, then customized via builder methods. +/// +/// # Defaults +/// - `num_retries`: 1 (2 total attempts) +/// - `retry_on`: empty (no status codes retried) +/// - `retry_backoff`: base_interval=25ms, max_interval=250ms, multiplier=2.0 +/// +/// # Guardrails +/// - `num_retries` must be >= 1. Values of 0 are clamped to 1. +/// - `num_retries` is capped so total attempts (num_retries + 1) never exceed 5. +#[derive(Debug, Clone)] +pub(crate) struct GrpcRetryPolicyConfig { + pub(crate) retry_on: Vec, + pub(crate) num_retries: u32, + pub(crate) retry_backoff: GrpcRetryBackoffConfig, +} + +impl GrpcRetryPolicyConfig { + /// Create a new retry policy with defaults. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Set the list of retryable gRPC status codes. + pub(crate) fn retry_on(mut self, codes: Vec) -> Self { + self.retry_on = codes; + self + } + + /// Set the number of retries (total attempts = num_retries + 1). + /// Values of 0 are clamped to 1. Values that would exceed 5 total attempts are capped. + pub(crate) fn num_retries(mut self, num_retries: u32) -> Self { + self.num_retries = num_retries.max(1).min(MAX_ATTEMPTS - 1); + self + } + + /// Set the backoff configuration. + pub(crate) fn retry_backoff(mut self, backoff: GrpcRetryBackoffConfig) -> Self { + self.retry_backoff = backoff; + self + } +} + +impl Default for GrpcRetryPolicyConfig { + fn default() -> Self { + Self { + retry_on: Vec::new(), + num_retries: 1, + retry_backoff: GrpcRetryBackoffConfig::default(), + } + } +} + +/// gRPC header for tracking retry attempts per the gRPC spec. +const GRPC_PREVIOUS_RPC_ATTEMPTS: &str = "grpc-previous-rpc-attempts"; + +/// Create a [`backoff::ExponentialBackoff`] from a [`GrpcRetryBackoffConfig`]. +fn make_backoff(config: &GrpcRetryBackoffConfig) -> backoff::ExponentialBackoff { + ExponentialBackoffBuilder::default() + .with_initial_interval(config.base_interval) + .with_max_interval(config.max_interval) + .with_multiplier(config.backoff_multiplier) + .with_randomization_factor(0.0) + .with_max_elapsed_time(None) + .build() +} + +/// gRPC retry policy with support for lock-free hot-swapping of configuration. +/// +/// Wraps a [`GrpcRetryPolicyConfig`] behind an [`ArcSwap`] so that configuration +/// can be atomically updated (e.g. from xDS) without blocking in-flight requests. +/// +/// Implements [`tower::retry::Policy`]. Tower's `Retry` service clones the policy +/// for each request, so `backoff` and `attempts` track per-request retry state +/// while the shared config is read from `ArcSwap` on each retry decision. +#[derive(Clone, Debug)] +pub(crate) struct GrpcRetryPolicy { + config: Arc>, + /// Backoff state for the current request, created from config on first retry. + backoff: Option, + /// Number of retry attempts made so far for the current request. + attempts: u32, +} + +impl GrpcRetryPolicy { + /// Create a new retry policy with the given configuration. + pub(crate) fn new(config: GrpcRetryPolicyConfig) -> Self { + Self { + config: Arc::new(ArcSwap::from(Arc::new(config))), + backoff: None, + attempts: 0, + } + } + + /// Atomically swap the configuration with a new one. + pub(crate) fn update_config(&self, config: GrpcRetryPolicyConfig) { + self.config.store(Arc::new(config)); + } + + /// Load the current configuration. + pub(crate) fn load_config(&self) -> Arc { + self.config.load_full() + } + + /// Get or create the backoff, and advance it to the next delay. + fn backoff_next(&mut self, backoff_config: &GrpcRetryBackoffConfig) -> Duration { + let backoff = self + .backoff + .get_or_insert_with(|| make_backoff(backoff_config)); + backoff + .next_backoff() + .unwrap_or(backoff_config.max_interval) + } +} + +impl Default for GrpcRetryPolicy { + fn default() -> Self { + Self::new(GrpcRetryPolicyConfig::default()) + } +} + +impl Policy, Response, tower::BoxError> for GrpcRetryPolicy +where + Req: Clone, +{ + type Future = tokio::time::Sleep; + + fn retry( + &mut self, + req: &mut Request, + result: &mut Result, tower::BoxError>, + ) -> Option { + let config = self.load_config(); + + if self.attempts >= config.num_retries { + return None; + } + + if !is_retryable(result, &config) { + return None; + } + + let delay = self.backoff_next(&config.retry_backoff); + self.attempts += 1; + + // Per gRPC spec: set grpc-previous-rpc-attempts header + req.headers_mut().insert( + GRPC_PREVIOUS_RPC_ATTEMPTS, + http::HeaderValue::from(self.attempts), + ); + + Some(tokio::time::sleep(delay)) + } + + fn clone_request(&mut self, req: &Request) -> Option> { + Some(req.clone()) + } +} + +/// Tower [`Layer`] that wraps a service with retry support. +/// +/// Converts the request body into a [`SharedBody`] (cloneable) and constructs +/// a fresh [`tower::retry::Retry`] service per request so that each request +/// gets its own retry state. +/// +/// This layer is generic over the retry policy — it is not tied to gRPC. +/// The gRPC-specific behavior lives in the [`Policy`] implementation +/// (e.g. [`GrpcRetryPolicy`]). +#[derive(Clone)] +pub(crate) struct RetryLayer

{ + policy: P, +} + +impl

RetryLayer

{ + /// Create a new retry layer with the given policy. + pub(crate) fn new(policy: P) -> Self { + Self { policy } + } +} + +impl Layer for RetryLayer

{ + type Service = RetryService; + + fn layer(&self, service: S) -> Self::Service { + RetryService { + inner: service, + policy: self.policy.clone(), + } + } +} + +/// Service that converts request bodies to [`SharedBody`] and retries via +/// [`tower::retry::Retry`] with the given policy. +#[derive(Clone)] +pub(crate) struct RetryService { + inner: S, + policy: P, +} + +impl Service> for RetryService +where + P: Policy>, Response, S::Error> + Clone + Send + 'static, + P::Future: Send, + S: Service>, Response = Response> + Clone + Send + 'static, + S::Error: Debug + Send + 'static, + S::Response: Send + 'static, + S::Future: Send + 'static, + B: Body + Unpin + Send + 'static, + B::Data: Clone + Send + Sync, + B::Error: Clone + Send + Sync, + Res: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = std::pin::Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let mut retry_svc = Retry::new(self.policy.clone(), self.inner.clone()); + let shared_request = request.map(|b| b.into_shared()); + Box::pin(retry_svc.call(shared_request)) + } +} + #[cfg(test)] mod tests { - use std::sync::Arc; - use super::*; // --- is_retryable_connection_error tests --- @@ -147,35 +450,160 @@ mod tests { #[test] fn test_is_retryable_connection_error_via_result() { - let err = io::Error::new(io::ErrorKind::ConnectionRefused, "refused"); - let result: Result<&http::Response<()>, &io::Error> = Err(&err); - assert!(is_retryable(&result, &[])); + let policy = GrpcRetryPolicyConfig::new(); + let err: tower::BoxError = + Box::new(io::Error::new(io::ErrorKind::ConnectionRefused, "refused")); + let result: Result, tower::BoxError> = Err(err); + assert!(is_retryable(&result, &policy)); } #[test] fn test_is_retryable_grpc_status_via_result() { + let policy = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder() .header("grpc-status", "14") // UNAVAILABLE .body(()) .unwrap(); - let result: Result<&http::Response<()>, &io::Error> = Ok(&response); - assert!(is_retryable(&result, &[tonic::Code::Unavailable])); + let result: Result, tower::BoxError> = Ok(response); + assert!(is_retryable(&result, &policy)); } #[test] fn test_is_not_retryable_ok_response() { + let policy = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder() .header("grpc-status", "0") // OK .body(()) .unwrap(); - let result: Result<&http::Response<()>, &io::Error> = Ok(&response); - assert!(!is_retryable(&result, &[tonic::Code::Unavailable])); + let result: Result, tower::BoxError> = Ok(response); + assert!(!is_retryable(&result, &policy)); } #[test] fn test_is_not_retryable_no_grpc_status_header() { + let policy = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder().body(()).unwrap(); - let result: Result<&http::Response<()>, &io::Error> = Ok(&response); - assert!(!is_retryable(&result, &[tonic::Code::Unavailable])); + let result: Result, tower::BoxError> = Ok(response); + assert!(!is_retryable(&result, &policy)); + } + + // --- GrpcRetryBackoffConfig tests --- + + #[test] + fn test_backoff_defaults() { + let backoff = GrpcRetryBackoffConfig::default(); + assert_eq!(backoff.base_interval, Duration::from_millis(25)); + assert_eq!(backoff.max_interval, Duration::from_millis(250)); + assert_eq!(backoff.backoff_multiplier, 2.0); + } + + #[test] + fn test_backoff_new_sets_max_to_10x_base() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(100)); + assert_eq!(backoff.base_interval, Duration::from_millis(100)); + assert_eq!(backoff.max_interval, Duration::from_millis(1000)); + } + + #[test] + fn test_backoff_base_interval_below_1ms_clamped() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_micros(500)); + assert_eq!(backoff.base_interval, Duration::from_millis(1)); + assert_eq!(backoff.max_interval, Duration::from_millis(10)); + } + + #[test] + fn test_backoff_max_interval_below_1ms_clamped() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(1)) + .max_interval(Duration::from_micros(100)); + assert_eq!(backoff.max_interval, Duration::from_millis(1)); + } + + #[test] + fn test_backoff_max_interval_below_base_clamped() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(100)) + .max_interval(Duration::from_millis(50)); + assert_eq!(backoff.max_interval, Duration::from_millis(100)); + } + + #[test] + fn test_backoff_custom_multiplier() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(25)) + .backoff_multiplier(1.5); + assert_eq!(backoff.backoff_multiplier, 1.5); + } + + // --- GrpcRetryPolicyConfig tests --- + + #[test] + fn test_policy_defaults() { + let policy = GrpcRetryPolicyConfig::new(); + assert!(policy.retry_on.is_empty()); + assert_eq!(policy.num_retries, 1); + assert_eq!(policy.retry_backoff, GrpcRetryBackoffConfig::default()); + } + + #[test] + fn test_policy_num_retries_zero_clamped_to_1() { + let policy = GrpcRetryPolicyConfig::new().num_retries(0); + assert_eq!(policy.num_retries, 1); + } + + #[test] + fn test_policy_num_retries_capped_at_4() { + // max_attempts=5, so num_retries = max_attempts - 1 = 4 + let policy = GrpcRetryPolicyConfig::new().num_retries(10); + assert_eq!(policy.num_retries, 4); + } + + #[test] + fn test_policy_num_retries_4_is_max() { + let policy = GrpcRetryPolicyConfig::new().num_retries(4); + assert_eq!(policy.num_retries, 4); + } + + #[test] + fn test_policy_retry_on() { + let policy = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable, tonic::Code::Cancelled]); + assert_eq!(policy.retry_on, vec![tonic::Code::Unavailable, tonic::Code::Cancelled]); + } + + #[test] + fn test_policy_custom_backoff() { + let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(50)) + .max_interval(Duration::from_millis(500)) + .backoff_multiplier(3.0); + let policy = GrpcRetryPolicyConfig::new().retry_backoff(backoff.clone()); + assert_eq!(policy.retry_backoff, backoff); + } + + // --- GrpcRetryPolicy (ArcSwap wrapper) tests --- + + #[test] + fn test_policy_load_config() { + let config = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]); + let policy = GrpcRetryPolicy::new(config); + let loaded = policy.load_config(); + assert_eq!(loaded.retry_on, vec![tonic::Code::Unavailable]); + assert_eq!(loaded.num_retries, 1); + } + + #[test] + fn test_policy_update_config() { + let policy = GrpcRetryPolicy::default(); + assert!(policy.load_config().retry_on.is_empty()); + + let new_config = GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Cancelled]) + .num_retries(3); + policy.update_config(new_config); + + let loaded = policy.load_config(); + assert_eq!(loaded.retry_on, vec![tonic::Code::Cancelled]); + assert_eq!(loaded.num_retries, 3); } } diff --git a/tonic-xds/src/testutil/grpc.rs b/tonic-xds/src/testutil/grpc.rs index b36dd1a30..fe9a69232 100644 --- a/tonic-xds/src/testutil/grpc.rs +++ b/tonic-xds/src/testutil/grpc.rs @@ -26,6 +26,38 @@ impl Greeter for MyGreeter { } } +/// A greeter that returns UNAVAILABLE for the first N calls, then succeeds. +pub(crate) struct FailFirstNGreeter { + msg: String, + call_count: std::sync::atomic::AtomicU32, + fail_first_n: u32, +} + +impl FailFirstNGreeter { + pub(crate) fn new(msg: &str, fail_first_n: u32) -> Self { + Self { + msg: msg.to_string(), + call_count: std::sync::atomic::AtomicU32::new(0), + fail_first_n, + } + } +} + +#[tonic::async_trait] +impl Greeter for FailFirstNGreeter { + async fn say_hello(&self, req: Request) -> Result, Status> { + let count = self + .call_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if count < self.fail_first_n { + return Err(Status::unavailable("temporarily unavailable")); + } + Ok(Response::new(HelloReply { + message: format!("{}: {}", self.msg, req.into_inner().name), + })) + } +} + /// A test server that runs a gRPC service and provides a channel for clients to connect. pub(crate) struct TestServer { /// The gRPC channel for talking to the test server. @@ -97,3 +129,36 @@ pub(crate) async fn spawn_greeter_server( addr, }) } + +/// Spawns a greeter server that fails the first N requests with UNAVAILABLE. +pub(crate) async fn spawn_fail_first_n_server( + msg: &str, + fail_first_n: u32, +) -> Result> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); + + let (tx, rx) = oneshot::channel(); + let svc = GreeterServer::new(FailFirstNGreeter::new(msg, fail_first_n)); + + let handle = tokio::spawn(async move { + Server::builder() + .add_service(svc) + .serve_with_incoming_shutdown(incoming, async { + let _ = rx.await; + }) + .await + }); + + let channel = Endpoint::from_shared(format!("http://{addr}"))? + .connect() + .await?; + + Ok(TestServer { + channel, + shutdown: tx, + handle, + addr, + }) +} From 59bceb74ace65f363eb44e232a9e18ea618989f3 Mon Sep 17 00:00:00 2001 From: Jeff Jiang Date: Tue, 7 Apr 2026 09:21:10 -0700 Subject: [PATCH 3/5] fmt --- tonic-xds/src/client/channel.rs | 6 +----- tonic-xds/src/client/retry.rs | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/tonic-xds/src/client/channel.rs b/tonic-xds/src/client/channel.rs index 3c8495ea3..ec6e28ead 100644 --- a/tonic-xds/src/client/channel.rs +++ b/tonic-xds/src/client/channel.rs @@ -416,11 +416,7 @@ mod tests { ); let xds_channel = XdsChannelBuilder::with_config(XdsChannelConfig::default()) - .build_grpc_channel_from_parts( - xds_manager.clone(), - xds_manager.clone(), - retry_policy, - ); + .build_grpc_channel_from_parts(xds_manager.clone(), xds_manager.clone(), retry_policy); let mut client = GreeterClient::new(xds_channel); diff --git a/tonic-xds/src/client/retry.rs b/tonic-xds/src/client/retry.rs index 2c130deac..5d3ccd6c5 100644 --- a/tonic-xds/src/client/retry.rs +++ b/tonic-xds/src/client/retry.rs @@ -7,13 +7,13 @@ use std::task::{Context, Poll}; use std::time::Duration; use arc_swap::ArcSwap; -use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; +use backoff::backoff::Backoff; use http::{Request, Response}; use http_body::Body; use shared_http_body::{SharedBody, SharedBodyExt}; -use tower::retry::Retry; use tower::retry::Policy; +use tower::retry::Retry; use tower::{Layer, Service}; /// Check if an error's source chain contains a retryable connection-level error. @@ -121,8 +121,7 @@ impl GrpcRetryBackoffConfig { impl Default for GrpcRetryBackoffConfig { fn default() -> Self { - Self::new(Duration::from_millis(25)) - .max_interval(Duration::from_millis(250)) + Self::new(Duration::from_millis(25)).max_interval(Duration::from_millis(250)) } } @@ -342,7 +341,9 @@ where { type Response = S::Response; type Error = S::Error; - type Future = std::pin::Pin> + Send>>; + type Future = std::pin::Pin< + Box> + Send>, + >; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) @@ -459,8 +460,7 @@ mod tests { #[test] fn test_is_retryable_grpc_status_via_result() { - let policy = GrpcRetryPolicyConfig::new() - .retry_on(vec![tonic::Code::Unavailable]); + let policy = GrpcRetryPolicyConfig::new().retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder() .header("grpc-status", "14") // UNAVAILABLE .body(()) @@ -471,8 +471,7 @@ mod tests { #[test] fn test_is_not_retryable_ok_response() { - let policy = GrpcRetryPolicyConfig::new() - .retry_on(vec![tonic::Code::Unavailable]); + let policy = GrpcRetryPolicyConfig::new().retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder() .header("grpc-status", "0") // OK .body(()) @@ -483,8 +482,7 @@ mod tests { #[test] fn test_is_not_retryable_no_grpc_status_header() { - let policy = GrpcRetryPolicyConfig::new() - .retry_on(vec![tonic::Code::Unavailable]); + let policy = GrpcRetryPolicyConfig::new().retry_on(vec![tonic::Code::Unavailable]); let response = http::Response::builder().body(()).unwrap(); let result: Result, tower::BoxError> = Ok(response); assert!(!is_retryable(&result, &policy)); @@ -530,8 +528,8 @@ mod tests { #[test] fn test_backoff_custom_multiplier() { - let backoff = GrpcRetryBackoffConfig::new(Duration::from_millis(25)) - .backoff_multiplier(1.5); + let backoff = + GrpcRetryBackoffConfig::new(Duration::from_millis(25)).backoff_multiplier(1.5); assert_eq!(backoff.backoff_multiplier, 1.5); } @@ -568,7 +566,10 @@ mod tests { fn test_policy_retry_on() { let policy = GrpcRetryPolicyConfig::new() .retry_on(vec![tonic::Code::Unavailable, tonic::Code::Cancelled]); - assert_eq!(policy.retry_on, vec![tonic::Code::Unavailable, tonic::Code::Cancelled]); + assert_eq!( + policy.retry_on, + vec![tonic::Code::Unavailable, tonic::Code::Cancelled] + ); } #[test] @@ -584,8 +585,7 @@ mod tests { #[test] fn test_policy_load_config() { - let config = GrpcRetryPolicyConfig::new() - .retry_on(vec![tonic::Code::Unavailable]); + let config = GrpcRetryPolicyConfig::new().retry_on(vec![tonic::Code::Unavailable]); let policy = GrpcRetryPolicy::new(config); let loaded = policy.load_config(); assert_eq!(loaded.retry_on, vec![tonic::Code::Unavailable]); From dd93a09b94447a19b9426faf60f2942d291f2e92 Mon Sep 17 00:00:00 2001 From: Jeff Jiang Date: Tue, 7 Apr 2026 09:35:27 -0700 Subject: [PATCH 4/5] fix lint --- tonic-xds/src/client/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tonic-xds/src/client/mod.rs b/tonic-xds/src/client/mod.rs index 3ac8fef2a..8d22f3029 100644 --- a/tonic-xds/src/client/mod.rs +++ b/tonic-xds/src/client/mod.rs @@ -2,5 +2,6 @@ pub(crate) mod channel; pub(crate) mod cluster; pub(crate) mod endpoint; pub(crate) mod lb; +#[allow(dead_code)] pub(crate) mod retry; pub(crate) mod route; From 66fd974ec008d03c18f693cc2557e5d7f64de800 Mon Sep 17 00:00:00 2001 From: Jeff Jiang Date: Tue, 7 Apr 2026 10:08:47 -0700 Subject: [PATCH 5/5] add test for independency --- tonic-xds/src/client/retry.rs | 78 ++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/tonic-xds/src/client/retry.rs b/tonic-xds/src/client/retry.rs index 5d3ccd6c5..5a4133503 100644 --- a/tonic-xds/src/client/retry.rs +++ b/tonic-xds/src/client/retry.rs @@ -189,7 +189,7 @@ fn make_backoff(config: &GrpcRetryBackoffConfig) -> backoff::ExponentialBackoff .with_initial_interval(config.base_interval) .with_max_interval(config.max_interval) .with_multiplier(config.backoff_multiplier) - .with_randomization_factor(0.0) + .with_randomization_factor(0.2) .with_max_elapsed_time(None) .build() } @@ -606,4 +606,80 @@ mod tests { assert_eq!(loaded.retry_on, vec![tonic::Code::Cancelled]); assert_eq!(loaded.num_retries, 3); } + + /// Verify that two concurrent requests using the same policy get independent + /// retry state (attempts counter and backoff). Tower's `Retry::call` clones + /// the policy per request, so mutations from one request must not leak into another. + #[tokio::test] + async fn test_retry_state_is_per_request() { + let policy = GrpcRetryPolicy::new( + GrpcRetryPolicyConfig::new() + .retry_on(vec![tonic::Code::Unavailable]) + .num_retries(2), + ); + + // Simulate two independent request sessions by cloning the policy + // (this is what tower's Retry::call does per request). + let mut policy_req1 = policy.clone(); + let mut policy_req2 = policy.clone(); + + // Build two independent requests + let mut req1 = http::Request::builder().body(()).unwrap(); + let mut req2 = http::Request::builder().body(()).unwrap(); + + type TestResult = Result, tower::BoxError>; + + // Both should be able to clone their requests + let _ = Policy::<_, http::Response<()>, tower::BoxError>::clone_request( + &mut policy_req1, + &req1, + ) + .expect("clone_request should succeed"); + let _ = Policy::<_, http::Response<()>, tower::BoxError>::clone_request( + &mut policy_req2, + &req2, + ) + .expect("clone_request should succeed"); + + // Simulate UNAVAILABLE response for req1, trigger a retry + let mut result1: TestResult = Ok(http::Response::builder() + .header("grpc-status", "14") + .body(()) + .unwrap()); + let retry1 = policy_req1.retry(&mut req1, &mut result1); + assert!(retry1.is_some(), "req1 should retry on first UNAVAILABLE"); + + // req1 has used one retry attempt. req2 should be unaffected — still + // has all retries available. + let mut result2: TestResult = Ok(http::Response::builder() + .header("grpc-status", "14") + .body(()) + .unwrap()); + let retry2 = policy_req2.retry(&mut req2, &mut result2); + assert!(retry2.is_some(), "req2 should still be able to retry"); + + // Retry req1 again — second retry + let mut result1b: TestResult = Ok(http::Response::builder() + .header("grpc-status", "14") + .body(()) + .unwrap()); + let retry1b = policy_req1.retry(&mut req1, &mut result1b); + assert!(retry1b.is_some(), "req1 should retry on second UNAVAILABLE"); + + // req1 is now exhausted (2 retries used out of 2) + let mut result1c: TestResult = Ok(http::Response::builder() + .header("grpc-status", "14") + .body(()) + .unwrap()); + let retry1c = policy_req1.retry(&mut req1, &mut result1c); + assert!(retry1c.is_none(), "req1 should be exhausted"); + + // req2 should still have its second retry available + let mut result2b: TestResult = Ok(http::Response::builder() + .header("grpc-status", "14") + .body(()) + .unwrap()); + let retry2b = policy_req2.retry(&mut req2, &mut result2b); + assert!(retry2b.is_some(), "req2 should still have retries left"); + } }