Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tonic-xds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ exclude = ["proto/test/*"]
[dependencies]
tonic = "0.14"
http = "1"
tower = { version = "0.5", default-features = false, features = ["discover"] }
http-body = "1"
tower = { version = "0.5", default-features = false, features = ["discover", "retry"] }
arc-swap = "1"
dashmap = "6.1"
thiserror = "2.0.17"
Expand All @@ -41,6 +42,8 @@ tokio = { version = "1", features = ["sync"] }
# cryptographic security, only statistical uniformity for traffic distribution.
fastrand = "2"
tokio-stream = "0.1"
backoff = "0.4"
shared_http_body = "0.1"

[lints]
workspace = true
Expand Down
56 changes: 49 additions & 7 deletions tonic-xds/src/client/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tower::{BoxError, Service, load::Load, util::BoxCloneService};
use {
crate::client::cluster::ClusterClientRegistryGrpc,
crate::client::lb::ClusterDiscovery,
crate::client::retry::{GrpcRetryPolicy, RetryLayer},
crate::client::route::{Router, XdsRoutingLayer},
tower::ServiceBuilder,
};
Expand Down Expand Up @@ -164,23 +165,26 @@ impl XdsChannelBuilder {
BoxCloneService::new(self.build_tonic_grpc_channel())
}

/// Builds an `XdsChannelGrpc` from the given router and cluster discovery.
/// Builds an `XdsChannelGrpc` from the given router, cluster discovery, and retry policy.
#[cfg(test)]
pub(crate) fn build_grpc_channel_from_parts(
&self,
router: Arc<dyn Router>,
discovery: Arc<dyn ClusterDiscovery<EndpointAddress, EndpointChannel<Channel>>>,
retry_policy: GrpcRetryPolicy,
) -> XdsChannelGrpc {
let routing_layer = XdsRoutingLayer::new(router);
let retry_layer = RetryLayer::new(retry_policy);
let cluster_registry = Arc::new(ClusterClientRegistryGrpc::new());
let lb_service = XdsLbService::new(cluster_registry, discovery);
let service = ServiceBuilder::new()
.layer(routing_layer)
.layer(retry_layer)
.map_request(|req: Request<shared_http_body::SharedBody<TonicBody>>| {
req.map(TonicBody::new)
})
.service(lb_service);
BoxCloneService::new(XdsChannelTonicGrpc {
config: self.config.clone(),
inner: service,
})
BoxCloneService::new(service)
}
}

Expand All @@ -192,6 +196,7 @@ mod tests {
use crate::client::endpoint::EndpointAddress;
use crate::client::endpoint::EndpointChannel;
use crate::client::lb::{BoxDiscover, ClusterDiscovery};
use crate::client::retry::GrpcRetryPolicy;
use crate::client::route::RouteDecision;
use crate::client::route::RouteInput;
use crate::client::route::Router;
Expand Down Expand Up @@ -333,8 +338,11 @@ mod tests {
let xds_manager = Arc::new(MockXdsManager::from_test_servers(&servers));

let xds_channel_builder = XdsChannelBuilder::with_config(XdsChannelConfig::default());
let xds_channel = xds_channel_builder
.build_grpc_channel_from_parts(xds_manager.clone(), xds_manager.clone());
let xds_channel = xds_channel_builder.build_grpc_channel_from_parts(
xds_manager.clone(),
xds_manager.clone(),
GrpcRetryPolicy::default(),
);

let client = GreeterClient::new(xds_channel);

Expand Down Expand Up @@ -387,4 +395,38 @@ mod tests {
let _ = server.handle.await;
}
}

#[tokio::test]
async fn test_retry_once_on_unavailable() {
use crate::client::retry::{GrpcRetryPolicy, GrpcRetryPolicyConfig};
use crate::testutil::grpc::spawn_fail_first_n_server;

// Server fails the first request with UNAVAILABLE, succeeds on retry.
let server = spawn_fail_first_n_server("retry-server", 1)
.await
.expect("Failed to spawn server");

let servers = vec![server];
let xds_manager = Arc::new(MockXdsManager::from_test_servers(&servers));

let retry_policy = GrpcRetryPolicy::new(
GrpcRetryPolicyConfig::new()
.retry_on(vec![tonic::Code::Unavailable])
.num_retries(1),
);

let xds_channel = XdsChannelBuilder::with_config(XdsChannelConfig::default())
.build_grpc_channel_from_parts(xds_manager.clone(), xds_manager.clone(), retry_policy);

let mut client = GreeterClient::new(xds_channel);

let response = client
.say_hello(HelloRequest {
name: "retry-test".to_string(),
})
.await
.expect("request should succeed after retry");

assert_eq!(response.into_inner().message, "retry-server: retry-test");
}
}
2 changes: 2 additions & 0 deletions tonic-xds/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ pub(crate) mod channel;
pub(crate) mod cluster;
pub(crate) mod endpoint;
pub(crate) mod lb;
#[allow(dead_code)]
pub(crate) mod retry;
pub(crate) mod route;
Loading
Loading