diff --git a/tonic-xds/Cargo.toml b/tonic-xds/Cargo.toml index c32cbb70f..e081cf991 100644 --- a/tonic-xds/Cargo.toml +++ b/tonic-xds/Cargo.toml @@ -44,6 +44,7 @@ fastrand = "2" tokio-stream = "0.1" backoff = "0.4" shared_http_body = "0.1" +tonic-prost = { version = "0.14", optional = true } [lints] workspace = true @@ -54,3 +55,18 @@ tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] } tonic = { version = "0.14", features = [ "server", "channel", "tls-ring" ] } tonic-prost = "0.14" tonic-prost-build = "0.14" +async-stream = "0.3" + +[features] +testutil = ["dep:tonic-prost"] + +[[example]] +name = "channel" +required-features = ["testutil"] + +[[example]] +name = "greeter_server" +required-features = ["testutil"] + +[[example]] +name = "xds_server" diff --git a/tonic-xds/examples/channel.rs b/tonic-xds/examples/channel.rs new file mode 100644 index 000000000..1c113fdda --- /dev/null +++ b/tonic-xds/examples/channel.rs @@ -0,0 +1,69 @@ +//! Example: send gRPC requests through an xDS-aware channel. +//! +//! Builds an xDS channel, then sends HelloRequest RPCs through it in a loop. +//! The channel discovers endpoints via the xDS management server and +//! load-balances across them. +//! +//! # Quick start +//! +//! Run all three examples (greeter backend, xDS server, this client) together: +//! +//! ```sh +//! ./tonic-xds/examples/run_xds_example.sh +//! ``` +//! +//! # Running individually +//! +//! ```sh +//! # Terminal 1: greeter backend +//! PORT=50051 cargo run -p tonic-xds --example greeter_server --features testutil +//! +//! # Terminal 2: xDS control plane +//! cargo run -p tonic-xds --example xds_server +//! +//! # Terminal 3: xDS client +//! GRPC_XDS_BOOTSTRAP_CONFIG='{"xds_servers":[{"server_uri":"http://localhost:18000"}],"node":{"id":"test"}}' \ +//! cargo run -p tonic-xds --example channel --features testutil +//! ``` +//! +//! # Configuration +//! +//! - `GRPC_XDS_BOOTSTRAP` — path to a bootstrap JSON file, **or** +//! - `GRPC_XDS_BOOTSTRAP_CONFIG` — inline bootstrap JSON +//! - `XDS_TARGET` — xDS target URI (default: `xds:///my-service`) + +use tonic_xds::testutil::proto::helloworld::{HelloRequest, greeter_client::GreeterClient}; +use tonic_xds::{XdsChannelBuilder, XdsChannelConfig, XdsUri}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let target_str = std::env::var("XDS_TARGET").unwrap_or_else(|_| "xds:///my-service".into()); + let target = XdsUri::parse(&target_str)?; + + println!("Building xDS channel for target: {target_str}"); + + let channel = XdsChannelBuilder::new(XdsChannelConfig::new(target)).build_grpc_channel()?; + + let mut client = GreeterClient::new(channel); + + println!("Channel built. Sending requests (Ctrl-C to stop)...\n"); + + for i in 1.. { + let request = HelloRequest { + name: format!("request-{i}"), + }; + + match client.say_hello(request).await { + Ok(response) => { + println!("[{i}] Response: {}", response.into_inner().message); + } + Err(status) => { + eprintln!("[{i}] Error: {status}"); + } + } + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + + Ok(()) +} diff --git a/tonic-xds/examples/greeter_server.rs b/tonic-xds/examples/greeter_server.rs new file mode 100644 index 000000000..d4cde3c5a --- /dev/null +++ b/tonic-xds/examples/greeter_server.rs @@ -0,0 +1,62 @@ +//! Example: standalone gRPC greeter server for testing xDS. +//! +//! Starts a greeter backend on a given port. Used together with the +//! `xds_server` and `channel` examples. +//! +//! # Quick start +//! +//! ```sh +//! ./tonic-xds/examples/run_xds_example.sh +//! ``` +//! +//! # Running individually +//! +//! ```sh +//! # Start on port 50051 (default): +//! cargo run -p tonic-xds --example greeter_server --features testutil +//! +//! # Custom port: +//! PORT=50052 cargo run -p tonic-xds --example greeter_server --features testutil +//! ``` + +use tonic::transport::Server; +use tonic::{Request, Response, Status}; +use tonic_xds::testutil::proto::helloworld::{ + HelloReply, HelloRequest, + greeter_server::{Greeter, GreeterServer}, +}; + +struct MyGreeter { + addr: String, +} + +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + let name = request.into_inner().name; + println!("Received request: name={name}"); + Ok(Response::new(HelloReply { + message: format!("Hello {name} from {}", self.addr), + })) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let port = std::env::var("PORT").unwrap_or_else(|_| "50051".to_string()); + let addr: std::net::SocketAddr = format!("0.0.0.0:{port}").parse()?; + + println!("Greeter server listening on {addr}"); + + Server::builder() + .add_service(GreeterServer::new(MyGreeter { + addr: addr.to_string(), + })) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/tonic-xds/examples/run_xds_example.sh b/tonic-xds/examples/run_xds_example.sh new file mode 100755 index 000000000..8e3d08493 --- /dev/null +++ b/tonic-xds/examples/run_xds_example.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Run the tonic-xds example: xDS server + greeter backend + channel client. +set -euo pipefail + +prefix() { + local tag="$1" + sed -u "s/^/[$tag] /" +} + +cleanup() { + echo "Shutting down..." + kill "$GREETER_PID" "$XDS_PID" 2>/dev/null || true +} +trap cleanup EXIT + +# 1. Start greeter backend +PORT=50051 cargo run -p tonic-xds --example greeter_server --features testutil 2>&1 | prefix "greeter" & +GREETER_PID=$! + +# 2. Start xDS control plane +cargo run -p tonic-xds --example xds_server 2>&1 | prefix "xds" & +XDS_PID=$! + +# Wait for servers to be ready +sleep 2 + +# 3. Run xDS-aware client +GRPC_XDS_BOOTSTRAP_CONFIG='{"xds_servers":[{"server_uri":"http://localhost:18000"}],"node":{"id":"test"}}' \ + cargo run -p tonic-xds --example channel --features testutil 2>&1 | prefix "client" diff --git a/tonic-xds/examples/xds_server.rs b/tonic-xds/examples/xds_server.rs new file mode 100644 index 000000000..c4232e59a --- /dev/null +++ b/tonic-xds/examples/xds_server.rs @@ -0,0 +1,278 @@ +//! Example: minimal xDS control plane for testing. +//! +//! Serves a static set of LDS/CDS/EDS resources via the ADS +//! (Aggregated Discovery Service) SotW protocol. Useful for testing +//! the `channel` example without an external control plane. +//! +//! # Quick start +//! +//! ```sh +//! ./tonic-xds/examples/run_xds_example.sh +//! ``` +//! +//! # Running individually +//! +//! ```sh +//! # Defaults: listener "my-service", endpoint 127.0.0.1:50051 +//! cargo run -p tonic-xds --example xds_server +//! +//! # Custom endpoints: +//! ENDPOINTS=127.0.0.1:50051,127.0.0.1:50052 cargo run -p tonic-xds --example xds_server +//! ``` +//! +//! # Configuration +//! +//! - `LISTENER_NAME` — listener name to serve (default: `my-service`) +//! - `CLUSTER_NAME` — cluster name (default: `my-cluster`) +//! - `ENDPOINTS` — comma-separated `host:port` list (default: `127.0.0.1:50051`) +//! - `PORT` — server listen port (default: `18000`) + +use envoy_types::pb::envoy::config::cluster::v3::Cluster; +use envoy_types::pb::envoy::config::cluster::v3::cluster::DiscoveryType; +use envoy_types::pb::envoy::config::core::v3 as core_v3; +use envoy_types::pb::envoy::config::endpoint::v3::{ + ClusterLoadAssignment, LbEndpoint, LocalityLbEndpoints, lb_endpoint::HostIdentifier, +}; +use envoy_types::pb::envoy::config::listener::v3::{ApiListener, Listener}; +use envoy_types::pb::envoy::config::route::v3::route::Action; +use envoy_types::pb::envoy::config::route::v3::route_action::ClusterSpecifier; +use envoy_types::pb::envoy::config::route::v3::route_match::PathSpecifier; +use envoy_types::pb::envoy::config::route::v3::{ + Route, RouteAction, RouteConfiguration, RouteMatch, VirtualHost, +}; +use envoy_types::pb::envoy::extensions::filters::network::http_connection_manager::v3::{ + HttpConnectionManager, http_connection_manager::RouteSpecifier, +}; +use envoy_types::pb::envoy::service::discovery::v3::{ + DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, + aggregated_discovery_service_server::{ + AggregatedDiscoveryService, AggregatedDiscoveryServiceServer, + }, +}; +use envoy_types::pb::google::protobuf::Any; +use prost::Message; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Request, Response, Status}; + +const TYPE_LISTENER: &str = "type.googleapis.com/envoy.config.listener.v3.Listener"; + +const TYPE_CLUSTER: &str = "type.googleapis.com/envoy.config.cluster.v3.Cluster"; +const TYPE_ENDPOINTS: &str = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; +const TYPE_HCM: &str = "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"; + +/// Static xDS resource snapshot, keyed by type URL. +struct Snapshot { + resources: HashMap>, +} + +impl Snapshot { + fn build(listener_name: &str, cluster_name: &str, endpoints: &[(String, u32)]) -> Self { + let mut resources: HashMap> = HashMap::new(); + let route_config_name = format!("{listener_name}-route"); + + // LDS: Listener → HttpConnectionManager (inline route config) + let hcm = HttpConnectionManager { + route_specifier: Some(RouteSpecifier::RouteConfig(RouteConfiguration { + name: route_config_name.clone(), + virtual_hosts: vec![VirtualHost { + name: "default".to_string(), + domains: vec!["*".to_string()], + routes: vec![Route { + r#match: Some(RouteMatch { + path_specifier: Some(PathSpecifier::Prefix("/".to_string())), + ..Default::default() + }), + action: Some(Action::Route(RouteAction { + cluster_specifier: Some(ClusterSpecifier::Cluster( + cluster_name.to_string(), + )), + ..Default::default() + })), + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + })), + ..Default::default() + }; + let listener = Listener { + name: listener_name.to_string(), + api_listener: Some(ApiListener { + api_listener: Some(Any { + type_url: TYPE_HCM.to_string(), + value: hcm.encode_to_vec().into(), + }), + }), + ..Default::default() + }; + resources + .entry(TYPE_LISTENER.to_string()) + .or_default() + .push(Any { + type_url: TYPE_LISTENER.to_string(), + value: listener.encode_to_vec(), + }); + + // CDS: Cluster (EDS type) + let cluster = Cluster { + name: cluster_name.to_string(), + cluster_discovery_type: Some( + envoy_types::pb::envoy::config::cluster::v3::cluster::ClusterDiscoveryType::Type( + DiscoveryType::Eds as i32, + ), + ), + ..Default::default() + }; + resources + .entry(TYPE_CLUSTER.to_string()) + .or_default() + .push(Any { + type_url: TYPE_CLUSTER.to_string(), + value: cluster.encode_to_vec(), + }); + + // EDS: ClusterLoadAssignment + let cla = ClusterLoadAssignment { + cluster_name: cluster_name.to_string(), + endpoints: vec![LocalityLbEndpoints { + lb_endpoints: endpoints + .iter() + .map(|(host, port)| LbEndpoint { + host_identifier: Some(HostIdentifier::Endpoint( + envoy_types::pb::envoy::config::endpoint::v3::Endpoint { + address: Some(core_v3::Address { + address: Some(core_v3::address::Address::SocketAddress( + core_v3::SocketAddress { + address: host.clone(), + port_specifier: Some( + core_v3::socket_address::PortSpecifier::PortValue( + *port, + ), + ), + ..Default::default() + }, + )), + }), + ..Default::default() + }, + )), + ..Default::default() + }) + .collect(), + ..Default::default() + }], + ..Default::default() + }; + resources + .entry(TYPE_ENDPOINTS.to_string()) + .or_default() + .push(Any { + type_url: TYPE_ENDPOINTS.to_string(), + value: cla.encode_to_vec(), + }); + + Self { resources } + } + + fn get(&self, type_url: &str) -> Vec { + self.resources.get(type_url).cloned().unwrap_or_default() + } +} + +struct XdsServer { + snapshot: Arc, +} + +#[tonic::async_trait] +impl AggregatedDiscoveryService for XdsServer { + type StreamAggregatedResourcesStream = + Pin> + Send>>; + + async fn stream_aggregated_resources( + &self, + request: Request>, + ) -> Result, Status> { + let mut inbound = request.into_inner(); + let snapshot = self.snapshot.clone(); + + let outbound = async_stream::try_stream! { + while let Some(req) = inbound.next().await { + let req = req?; + let short_type = req.type_url.rsplit('/').next().unwrap_or(&req.type_url); + + // Skip ACKs — only respond to new subscriptions or NACKs. + if !req.version_info.is_empty() && req.error_detail.is_none() { + continue; + } + + let resources = snapshot.get(&req.type_url); + println!( + " -> {short_type}: {count} resource(s)", + count = resources.len(), + ); + yield DiscoveryResponse { + version_info: "1".to_string(), + type_url: req.type_url, + nonce: "1".to_string(), + resources, + ..Default::default() + }; + } + }; + + Ok(Response::new(Box::pin(outbound))) + } + + type DeltaAggregatedResourcesStream = + Pin> + Send>>; + + async fn delta_aggregated_resources( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("delta not supported")) + } +} + +fn parse_endpoints(s: &str) -> Vec<(String, u32)> { + s.split(',') + .filter(|e| !e.is_empty()) + .map(|e| { + let (host, port) = e.rsplit_once(':').expect("endpoint must be host:port"); + (host.to_string(), port.parse().expect("invalid port")) + }) + .collect() +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let listener_name = std::env::var("LISTENER_NAME").unwrap_or_else(|_| "my-service".to_string()); + let cluster_name = std::env::var("CLUSTER_NAME").unwrap_or_else(|_| "my-cluster".to_string()); + let endpoints_str = + std::env::var("ENDPOINTS").unwrap_or_else(|_| "127.0.0.1:50051".to_string()); + let port = std::env::var("PORT").unwrap_or_else(|_| "18000".to_string()); + + let endpoints = parse_endpoints(&endpoints_str); + let snapshot = Arc::new(Snapshot::build(&listener_name, &cluster_name, &endpoints)); + + let addr: std::net::SocketAddr = format!("0.0.0.0:{port}").parse()?; + + println!("xDS server listening on {addr}"); + println!(" listener: {listener_name}"); + println!(" cluster: {cluster_name}"); + println!(" endpoints: {endpoints_str}"); + println!(); + + tonic::transport::Server::builder() + .add_service(AggregatedDiscoveryServiceServer::new(XdsServer { + snapshot, + })) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/tonic-xds/src/client/channel.rs b/tonic-xds/src/client/channel.rs index ec6e28ead..240e879c6 100644 --- a/tonic-xds/src/client/channel.rs +++ b/tonic-xds/src/client/channel.rs @@ -1,70 +1,98 @@ use crate::XdsUri; +use crate::client::cluster::ClusterClientRegistryGrpc; use crate::client::endpoint::{EndpointAddress, EndpointChannel}; -use crate::client::lb::XdsLbService; -use crate::client::route::XdsRoutingService; -use crate::common::async_util::BoxFuture; +use crate::client::lb::{ClusterDiscovery, XdsLbService}; +use crate::client::route::{Router, XdsRoutingLayer}; +use crate::xds::bootstrap::{BootstrapConfig, BootstrapError}; +use crate::xds::cache::XdsCache; +use crate::xds::cluster_discovery::{ + EndpointConnector, XdsClusterDiscovery, default_endpoint_connector, +}; +use crate::xds::resource_manager::XdsResourceManager; +use crate::xds::routing::XdsRouter; use http::Request; use std::fmt::Debug; use std::sync::Arc; use std::task::{Context, Poll}; use tonic::{body::Body as TonicBody, client::GrpcService, transport::channel::Channel}; -use tower::{BoxError, Service, load::Load, util::BoxCloneService}; +use tower::{BoxError, Service, ServiceBuilder, util::BoxCloneService}; +use xds_client::{ClientConfig, Node, ProstCodec, TokioRuntime, TonicTransportBuilder, XdsClient}; -#[cfg(test)] -use { - crate::client::cluster::ClusterClientRegistryGrpc, - crate::client::lb::ClusterDiscovery, - crate::client::retry::{GrpcRetryPolicy, RetryLayer}, - crate::client::route::{Router, XdsRoutingLayer}, - tower::ServiceBuilder, -}; +use crate::client::retry::{GrpcRetryPolicy, GrpcRetryPolicyConfig, RetryLayer}; /// Configuration for building [`XdsChannel`] / [`XdsChannelGrpc`]. -/// Currently, only support specifying the xDS URI for the target service. -/// In the future, more configurations such as xDS management server address will be added. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct XdsChannelConfig { - target_uri: Option, + target_uri: XdsUri, + bootstrap: Option, } impl XdsChannelConfig { - /// Sets the xDS URI for the channel. + /// Creates a new config with the given target URI. + #[must_use] + pub fn new(target_uri: XdsUri) -> Self { + Self { + target_uri, + bootstrap: None, + } + } + + /// Sets the bootstrap configuration. + /// + /// If not set, the builder falls back to loading from environment + /// variables (`GRPC_XDS_BOOTSTRAP` or `GRPC_XDS_BOOTSTRAP_CONFIG`). #[must_use] - pub fn with_target_uri(mut self, target_uri: XdsUri) -> Self { - self.target_uri = Some(target_uri); + pub fn with_bootstrap(mut self, bootstrap: BootstrapConfig) -> Self { + self.bootstrap = Some(bootstrap); self } + + /// Eagerly loads bootstrap configuration from environment variables. + /// + /// This is optional — [`XdsChannelBuilder::build_grpc_channel`] falls back + /// to env vars automatically if no bootstrap is set. Use this method when + /// you want to surface bootstrap errors at config time rather than build time. + /// + /// Reads from `GRPC_XDS_BOOTSTRAP` (file path) first, then falls back to + /// `GRPC_XDS_BOOTSTRAP_CONFIG` (inline JSON). + pub fn with_bootstrap_from_env(mut self) -> Result { + self.bootstrap = Some(BootstrapConfig::from_env()?); + Ok(self) + } +} + +/// Errors that can occur when building an [`XdsChannel`]. +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + /// Bootstrap configuration could not be loaded. + #[error("bootstrap: {0}")] + Bootstrap(#[from] BootstrapError), +} + +/// Holds owned resources whose background tasks must live as long as the channel. +/// +/// Stored as `Option>` on [`XdsChannel`] so clones share ownership +/// cheaply. When the last clone drops, the resource manager cascade task and +/// ADS worker are aborted. The `XdsCache` is kept alive separately by +/// `XdsClusterDiscovery` in the service stack. +struct XdsChannelResources { + _resource_manager: XdsResourceManager, + _xds_client: XdsClient, } /// `XdsChannel` is an xDS-capable [`tower::Service`] implementation. /// /// It routes requests according to the xDS configuration that it fetches from the xDS management server. /// The routing implementation is based on the [Google gRPC xDS features](https://grpc.github.io/grpc/core/md_doc_grpc_xds_features.html). -/// -/// # Type Parameters -/// -/// * `Req` - The request type that this channel accepts, as an example: `http::Request`. -/// * `Endpoint` - The endpoint identifier type used for load balancing (e.g., socket address). -/// * `S` - The underlying [`tower::Service`] implementation that handles individual endpoint connections. -pub struct XdsChannel -where - Req: Send + 'static, - S: Service, - S::Response: Send + 'static, -{ +pub struct XdsChannel { config: Arc, - // Currently the routing decision is directly executed by the XdsLbService. - // In the future, we will add more layers in between for retries, request mirroring, etc. - inner: XdsRoutingService>, + inner: S, + /// Keeps background tasks alive. `None` when built from parts in tests. + _resources: Option>, } #[allow(clippy::missing_fields_in_debug)] -impl Debug for XdsChannel -where - Req: Send + 'static, - S: Service, - S::Response: Send + 'static, -{ +impl Debug for XdsChannel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("XdsChannel") .field("config", &self.config) @@ -72,97 +100,130 @@ where } } -impl Clone for XdsChannel -where - Req: Send + 'static, - S: Service, - S::Response: Send + 'static, - XdsRoutingService>: Clone, -{ +impl Clone for XdsChannel { fn clone(&self) -> Self { Self { config: self.config.clone(), inner: self.inner.clone(), + _resources: self._resources.clone(), } } } -impl Service> for XdsChannel, Endpoint, S> +impl Service for XdsChannel where - B: Send + 'static, - Request: Send + 'static, - Endpoint: std::hash::Hash + Eq + Clone + Send + 'static, - S: Service> + Load + Send + 'static, - S::Response: Send + 'static, - S::Error: Into, - S::Future: Send, - ::Metric: std::fmt::Debug, + S: Service, { type Response = S::Response; type Error = BoxError; - type Future = BoxFuture>; + type Future = S::Future; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } - fn call(&mut self, request: Request) -> Self::Future { + fn call(&mut self, request: Req) -> Self::Future { self.inner.call(request) } } -/// A type alias for an `XdsChannel` that uses Tonic's Channel as the underlying transport. -pub(crate) type XdsChannelTonicGrpc = - XdsChannel, EndpointAddress, EndpointChannel>; - /// A [`tonic::client::GrpcService`] implementation that can route and load-balance /// gRPC requests based on xDS configuration. pub type XdsChannelGrpc = BoxCloneService, http::Response, BoxError>; -// Static assertion that XdsChannelGrpc and XdsChannelTonicGrpc implement GrpcService +// Static assertion that XdsChannelGrpc implements GrpcService const _: fn() = || { fn assert_grpc_service>() {} assert_grpc_service::(); - assert_grpc_service::(); }; /// Builder for creating an [`XdsChannel`] or [`XdsChannelGrpc`]. #[derive(Clone, Debug)] pub struct XdsChannelBuilder { - #[allow(dead_code)] config: Arc, } impl XdsChannelBuilder { - /// Create a builder from an channel configurations. + /// Creates a builder from a channel configuration. #[must_use] - pub fn with_config(config: XdsChannelConfig) -> Self { + pub fn new(config: XdsChannelConfig) -> Self { Self { config: Arc::new(config), } } - /// Builds an `XdsChannel`, which takes generic request, endpoint, and service types and can be - /// used for generic HTTP services. - #[must_use] - pub fn build_channel(&self) -> XdsChannel - where - Req: Send + 'static, - S: Service, - S::Response: Send + 'static, - { - todo!("Implement XdsChannel building logic"); + fn build_tonic_grpc_channel(&self) -> Result { + let bootstrap = match self.config.bootstrap.clone() { + Some(b) => b, + None => BootstrapConfig::from_env()?, + }; + + let listener_name = self.config.target_uri.target.clone(); + + let server_uri = bootstrap.server_uri().to_owned(); + let node = Node::from(bootstrap.node); + let client_config = ClientConfig::new(node, server_uri); + let xds_client = XdsClient::builder( + client_config, + TonicTransportBuilder::default(), + ProstCodec, + TokioRuntime, + ) + .build(); + + let cache = Arc::new(XdsCache::new()); + let resource_manager = + XdsResourceManager::new(xds_client.clone(), cache.clone(), listener_name); + + Ok(self.build_from_cache(cache, xds_client, resource_manager)) } - pub(crate) fn build_tonic_grpc_channel(&self) -> XdsChannelTonicGrpc { - todo!("Implement XdsChannel building logic"); + /// Internal builder that wires the service stack from a pre-built cache. + /// + /// Separated from `build_tonic_grpc_channel` so tests can inject a + /// disconnected `XdsClient` and pre-populated cache. + fn build_from_cache( + &self, + cache: Arc, + xds_client: XdsClient, + resource_manager: XdsResourceManager, + ) -> XdsChannelGrpc { + let router: Arc = Arc::new(XdsRouter::new(&cache)); + let connector: EndpointConnector = Arc::new(default_endpoint_connector); + let discovery: Arc>> = + Arc::new(XdsClusterDiscovery::new(cache, connector)); + let retry_policy = GrpcRetryPolicy::new(GrpcRetryPolicyConfig::default()); + + let resources = Arc::new(XdsChannelResources { + _resource_manager: resource_manager, + _xds_client: xds_client, + }); + + let routing_layer = XdsRoutingLayer::new(router); + let retry_layer = RetryLayer::new(retry_policy); + let cluster_registry = Arc::new(ClusterClientRegistryGrpc::new()); + let lb_service = XdsLbService::new(cluster_registry, discovery); + let inner = ServiceBuilder::new() + .layer(routing_layer) + .layer(retry_layer) + .map_request(|req: Request>| { + req.map(TonicBody::new) + }) + .service(lb_service); + + BoxCloneService::new(XdsChannel { + config: self.config.clone(), + inner, + _resources: Some(resources), + }) } /// Builds an `XdsChannelGrpc`, which is a type-erased gRPC channel. - #[must_use] - pub fn build_grpc_channel(&self) -> XdsChannelGrpc { - BoxCloneService::new(self.build_tonic_grpc_channel()) + // TODO: Support HTTP and other channel types (not just gRPC). This will + // require a generic `build()` or separate `build_http_channel()` method. + pub fn build_grpc_channel(&self) -> Result { + self.build_tonic_grpc_channel() } /// Builds an `XdsChannelGrpc` from the given router, cluster discovery, and retry policy. @@ -177,24 +238,32 @@ impl XdsChannelBuilder { let retry_layer = RetryLayer::new(retry_policy); let cluster_registry = Arc::new(ClusterClientRegistryGrpc::new()); let lb_service = XdsLbService::new(cluster_registry, discovery); - let service = ServiceBuilder::new() + let inner = ServiceBuilder::new() .layer(routing_layer) .layer(retry_layer) .map_request(|req: Request>| { req.map(TonicBody::new) }) .service(lb_service); - BoxCloneService::new(service) + BoxCloneService::new(XdsChannel { + config: self.config.clone(), + inner, + _resources: None, + }) } } #[cfg(test)] mod tests { - use super::XdsChannelBuilder; - use super::XdsChannelConfig; + use super::{XdsChannelBuilder, XdsChannelConfig}; + use crate::XdsUri; use crate::client::channel::XdsChannelGrpc; use crate::client::endpoint::EndpointAddress; use crate::client::endpoint::EndpointChannel; + + fn test_config() -> XdsChannelConfig { + XdsChannelConfig::new(XdsUri::parse("xds:///test-service").unwrap()) + } use crate::client::lb::{BoxDiscover, ClusterDiscovery}; use crate::client::retry::GrpcRetryPolicy; use crate::client::route::RouteDecision; @@ -204,6 +273,9 @@ mod tests { use crate::testutil::grpc::GreeterClient; use crate::testutil::grpc::HelloRequest; use crate::testutil::grpc::TestServer; + use crate::xds::cache::XdsCache; + use crate::xds::resource::EndpointsResource; + use crate::xds::resource::route_config::RouteConfigResource; use std::sync::Arc; use tokio::sync::mpsc; use tonic::transport::Channel; @@ -337,7 +409,7 @@ mod tests { // Create a mock XdsManager with the test servers let xds_manager = Arc::new(MockXdsManager::from_test_servers(&servers)); - let xds_channel_builder = XdsChannelBuilder::with_config(XdsChannelConfig::default()); + let xds_channel_builder = XdsChannelBuilder::new(test_config()); let xds_channel = xds_channel_builder.build_grpc_channel_from_parts( xds_manager.clone(), xds_manager.clone(), @@ -415,8 +487,11 @@ mod tests { .num_retries(1), ); - let xds_channel = XdsChannelBuilder::with_config(XdsChannelConfig::default()) - .build_grpc_channel_from_parts(xds_manager.clone(), xds_manager.clone(), retry_policy); + let xds_channel = XdsChannelBuilder::new(test_config()).build_grpc_channel_from_parts( + xds_manager.clone(), + xds_manager.clone(), + retry_policy, + ); let mut client = GreeterClient::new(xds_channel); @@ -429,4 +504,165 @@ mod tests { assert_eq!(response.into_inner().message, "retry-server: retry-test"); } + + /// Helper: creates a `RouteConfigResource` that routes all traffic to the given cluster. + fn make_test_route_config(cluster_name: &str) -> Arc { + use crate::xds::resource::route_config::*; + + Arc::new(RouteConfigResource { + name: "test-route".to_string(), + virtual_hosts: vec![VirtualHostConfig { + name: "default".to_string(), + domains: vec!["*".to_string()], + routes: vec![RouteConfig { + match_criteria: RouteConfigMatch { + path_specifier: PathSpecifierConfig::Prefix(String::new()), + headers: vec![], + case_sensitive: false, + match_fraction: None, + }, + action: RouteConfigAction::Cluster(cluster_name.to_string()), + }], + }], + }) + } + + /// Helper: creates an `EndpointsResource` from test server addresses. + fn make_test_endpoints(cluster_name: &str, servers: &[TestServer]) -> Arc { + use crate::xds::resource::endpoints::{HealthStatus, LocalityEndpoints, ResolvedEndpoint}; + + Arc::new(EndpointsResource { + cluster_name: cluster_name.to_string(), + localities: vec![LocalityEndpoints { + locality: None, + endpoints: servers + .iter() + .map(|s| ResolvedEndpoint { + address: EndpointAddress::from(s.addr), + health_status: HealthStatus::Healthy, + load_balancing_weight: 1, + }) + .collect(), + load_balancing_weight: 100, + priority: 0, + }], + }) + } + + /// Builds an XdsChannelGrpc using real XdsRouter and XdsClusterDiscovery + /// backed by the given cache. + async fn build_xds_channel_from_cache(cache: Arc) -> XdsChannelGrpc { + use crate::xds::cluster_discovery::{ + EndpointConnector, XdsClusterDiscovery, default_endpoint_connector, + }; + use crate::xds::routing::XdsRouter; + + let router: Arc = Arc::new(XdsRouter::new(&cache)); + let connector: EndpointConnector = Arc::new(default_endpoint_connector); + let discovery: Arc>> = + Arc::new(XdsClusterDiscovery::new(cache, connector)); + + let builder = XdsChannelBuilder::new(test_config()); + builder.build_grpc_channel_from_parts(router, discovery, GrpcRetryPolicy::default()) + } + + /// Tests the full xDS stack (XdsRouter + XdsClusterDiscovery) with a + /// pre-populated cache, validating that requests are routed and + /// load-balanced across real backend servers. + #[tokio::test] + async fn test_xds_channel_with_real_router_and_discovery() { + let num_servers = 3; + let num_requests = 300; + let cluster_name = "test-cluster"; + let (_, servers) = setup_grpc_servers(num_servers).await; + + let cache = Arc::new(XdsCache::new()); + cache.update_route_config(make_test_route_config(cluster_name)); + cache.update_endpoints(cluster_name, make_test_endpoints(cluster_name, &servers)); + + let channel = build_xds_channel_from_cache(cache).await; + let client = GreeterClient::new(channel); + + let (successful, error_types, server_counts) = + send_grpc_requests(client, num_requests).await; + + assert_eq!( + successful, num_requests, + "Expected 100% success rate. Errors: {error_types:?}", + ); + assert_eq!( + server_counts.len(), + num_servers, + "Expected all {num_servers} servers to receive traffic. Counts: {server_counts:?}", + ); + + for server in servers { + let _ = server.shutdown.send(()); + let _ = server.handle.await; + } + } + + /// Tests that endpoint changes are picked up dynamically by the + /// XdsClusterDiscovery while the channel is serving requests. + #[tokio::test] + async fn test_xds_channel_handles_dynamic_endpoint_updates() { + let cluster_name = "test-cluster"; + let (_, servers) = setup_grpc_servers(2).await; + + let cache = Arc::new(XdsCache::new()); + cache.update_route_config(make_test_route_config(cluster_name)); + // Start with only the first server. + cache.update_endpoints( + cluster_name, + make_test_endpoints(cluster_name, &servers[..1]), + ); + + let channel = build_xds_channel_from_cache(cache.clone()).await; + let client = GreeterClient::new(channel.clone()); + + // Phase 1: all traffic goes to server-0. + let (successful, _, server_counts) = send_grpc_requests(client, 50).await; + assert_eq!(successful, 50); + assert_eq!( + server_counts.len(), + 1, + "Only 1 server should receive traffic before update. Counts: {server_counts:?}", + ); + + // Add second server. + cache.update_endpoints(cluster_name, make_test_endpoints(cluster_name, &servers)); + // Give the endpoint manager diff loop time to process the update. + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Phase 2: traffic should go to both servers. + let client2 = GreeterClient::new(channel); + let (successful, _, server_counts) = send_grpc_requests(client2, 200).await; + assert_eq!(successful, 200); + assert_eq!( + server_counts.len(), + 2, + "Both servers should receive traffic after update. Counts: {server_counts:?}", + ); + + for server in servers { + let _ = server.shutdown.send(()); + let _ = server.handle.await; + } + } + + /// Smoke test: verifies builder wiring with a disconnected XdsClient + /// doesn't panic during construction. + #[tokio::test] + async fn test_build_from_cache_smoke() { + use crate::xds::resource_manager::XdsResourceManager; + + let cache = Arc::new(XdsCache::new()); + let xds_client = xds_client::XdsClient::disconnected(); + let resource_manager = + XdsResourceManager::new(xds_client.clone(), cache.clone(), "test-listener".into()); + + let builder = XdsChannelBuilder::new(test_config()); + let _channel = builder.build_from_cache(cache, xds_client, resource_manager); + // Construction should succeed without panicking. + } } diff --git a/tonic-xds/src/client/lb.rs b/tonic-xds/src/client/lb.rs index ae21ed86e..5a88a741a 100644 --- a/tonic-xds/src/client/lb.rs +++ b/tonic-xds/src/client/lb.rs @@ -47,7 +47,6 @@ where S::Response: Send + 'static, { /// Creates a new `XdsLbService` with the given cluster client registry and cluster discovery. - #[allow(dead_code)] pub(crate) fn new( cluster_registry: Arc>, cluster_discovery: Arc>, diff --git a/tonic-xds/src/client/retry.rs b/tonic-xds/src/client/retry.rs index 5a4133503..329a5bacc 100644 --- a/tonic-xds/src/client/retry.rs +++ b/tonic-xds/src/client/retry.rs @@ -159,7 +159,8 @@ impl GrpcRetryPolicyConfig { /// Set the number of retries (total attempts = num_retries + 1). /// Values of 0 are clamped to 1. Values that would exceed 5 total attempts are capped. pub(crate) fn num_retries(mut self, num_retries: u32) -> Self { - self.num_retries = num_retries.max(1).min(MAX_ATTEMPTS - 1); + // Safety: clamp panics if min > max. Here min=1, max=MAX_ATTEMPTS-1=4 (const). + self.num_retries = num_retries.clamp(1, MAX_ATTEMPTS - 1); self } diff --git a/tonic-xds/src/lib.rs b/tonic-xds/src/lib.rs index a5588ce8a..ae2856e10 100644 --- a/tonic-xds/src/lib.rs +++ b/tonic-xds/src/lib.rs @@ -1,53 +1,115 @@ //! # tonic-xds //! -//! xDS (discovery service) support for [Tonic](https://docs.rs/tonic) gRPC clients as well as -//! general [`tower::Service`]. +//! xDS-based service discovery, routing, and load balancing for +//! [Tonic](https://docs.rs/tonic) gRPC clients. //! -//! This crate provides an xDS-enabled [`tonic::client::GrpcService`] implementation ([`XdsChannelGrpc`]) -//! that automatically discovers, routes and load-balances across endpoints using the xDS protocol. -//! The implementation will align with the -//! [gRPC xDS features](https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md). +//! This crate provides [`XdsChannelGrpc`], a [`tonic::client::GrpcService`] +//! that connects to an xDS management server (via ADS) and automatically +//! discovers, routes, and load-balances requests across endpoints. The +//! implementation follows the [gRPC xDS features] specification. //! -//! In addition to gRPC, this crate also provides a generic [`tower::Service`] implementation ([`XdsChannel`]) -//! for enabling xDS features for generic Http clients. This can be used to support both gRPC and Http -//! clients by the same xDS management server. +//! [gRPC xDS features]: https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md //! -//! ## Current Planned Features: +//! ## Getting started //! -//! - LDS / RDS / CDS / EDS subscriptions via ADS stream. -//! - Client-side P2C load balancing -//! - Other features will be added in future releases. +//! 1. **Provide a bootstrap configuration** that tells the client where +//! the xDS management server lives and what node identity to present. +//! The format matches [gRFC A27] — a JSON object with `xds_servers` +//! and an optional `node`. //! -//! ## Example +//! 2. **Build the channel** with [`XdsChannelBuilder`], pointing it at +//! an `xds:///` target URI. +//! +//! 3. **Pass the channel** to your generated gRPC client. +//! +//! [gRFC A27]: https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md +//! +//! ## Bootstrap configuration +//! +//! The bootstrap can be supplied in three ways (in order of precedence): +//! +//! | Method | How | +//! |--------|-----| +//! | Programmatic | [`BootstrapConfig::from_json`] then [`XdsChannelConfig::with_bootstrap`] | +//! | Environment (explicit) | [`XdsChannelConfig::with_bootstrap_from_env`] | +//! | Environment (implicit) | Omit bootstrap; the builder loads from env vars automatically | +//! +//! The environment variables checked are: +//! - `GRPC_XDS_BOOTSTRAP` — path to a JSON file +//! - `GRPC_XDS_BOOTSTRAP_CONFIG` — inline JSON string +//! +//! Minimal bootstrap JSON: +//! +//! ```json +//! { +//! "xds_servers": [{"server_uri": "xds.example.com:443"}], +//! "node": {"id": "my-node"} +//! } +//! ``` +//! +//! ## Examples +//! +//! ### Using environment variables (simplest) //! //! ```rust,no_run -//! use tonic_xds::{XdsChannelBuilder, XdsChannelConfig, XdsChannelGrpc, XdsUri}; +//! // Requires GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG to be set. +//! use tonic_xds::{XdsChannelBuilder, XdsChannelConfig, XdsUri}; //! -//! let target_uri = XdsUri::parse( -//! "xds:///myservice:50051" -//! ).expect("fail to parse valid target URI"); +//! let target = XdsUri::parse("xds:///myservice:50051").unwrap(); +//! let channel = XdsChannelBuilder::new(XdsChannelConfig::new(target)) +//! .build_grpc_channel() +//! .unwrap(); //! -//! let xds_channel = XdsChannelBuilder::with_config( -//! XdsChannelConfig::default().with_target_uri(target_uri) -//! ).build_grpc_channel(); +//! // let client = MyServiceClient::new(channel); +//! ``` //! -//! // Use with your generated gRPC client -//! // let client = MyServiceClient::new(xds_channel); -//! // client.my_rpc_method(...).await; +//! ### Using inline JSON +//! +//! ```rust,no_run +//! use tonic_xds::{BootstrapConfig, XdsChannelBuilder, XdsChannelConfig, XdsUri}; +//! +//! let bootstrap = BootstrapConfig::from_json(r#"{ +//! "xds_servers": [{"server_uri": "xds.example.com:443"}], +//! "node": {"id": "my-node", "cluster": "my-cluster"} +//! }"#).unwrap(); +//! +//! let target = XdsUri::parse("xds:///myservice:50051").unwrap(); +//! let channel = XdsChannelBuilder::new( +//! XdsChannelConfig::new(target).with_bootstrap(bootstrap), +//! ).build_grpc_channel().unwrap(); +//! +//! // let client = MyServiceClient::new(channel); //! ``` //! -//! ## How it works +//! ## xDS features +//! +//! | Feature | gRFC | Status | +//! |---------|------|--------| +//! | Bootstrap configuration | [A27] | Supported | +//! | xDS transport (ADS, SotW) | [A27] | Supported | +//! | LDS / RDS / CDS / EDS resource cascade | [A27] | Supported | +//! | Route matching (domain, path, headers) | [A28] | Supported | +//! | Weighted cluster traffic splitting | [A28] | Supported | +//! | Case-insensitive header matching | [A63] | Supported | +//! | Client-side P2C load balancing | | Supported | +//! | TLS endpoint connections | [A29] | Planned | +//! | Least-request load balancing | [A48] | Planned | //! -//! [`XdsChannelGrpc`] connects to an xDS management server and subscribes to resource updates for -//! listeners, routes, clusters, and endpoints. Requests are automatically routed and load-balanced -//! in stacked [`tower::Service`]s that implement the [gRPC xDS features](https://github.com/grpc/grpc/blob/master/doc/grpc_xds_features.md). +//! [A27]: https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md +//! [A28]: https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md +//! [A29]: https://github.com/grpc/proposal/blob/master/A29-xds-tls-security.md +//! [A48]: https://github.com/grpc/proposal/blob/master/A48-xds-least-request-lb-policy.md +//! [A63]: https://github.com/grpc/proposal/blob/master/A63-xds-string-matcher-ignore-case.md pub(crate) mod client; pub(crate) mod common; pub(crate) mod xds; -pub use client::channel::{XdsChannel, XdsChannelBuilder, XdsChannelConfig, XdsChannelGrpc}; +pub use client::channel::{ + BuildError, XdsChannel, XdsChannelBuilder, XdsChannelConfig, XdsChannelGrpc, +}; +pub use xds::bootstrap::{BootstrapConfig, BootstrapError}; pub use xds::uri::{XdsUri, XdsUriError}; -#[cfg(test)] -pub(crate) mod testutil; +#[cfg(any(test, feature = "testutil"))] +pub mod testutil; diff --git a/tonic-xds/src/testutil/mod.rs b/tonic-xds/src/testutil/mod.rs index 54eda10ee..6850fbda7 100644 --- a/tonic-xds/src/testutil/mod.rs +++ b/tonic-xds/src/testutil/mod.rs @@ -1,5 +1,5 @@ -#[cfg(test)] -pub(crate) mod grpc; +//! Test utilities and proto definitions. #[cfg(test)] -pub(crate) mod proto; +pub(crate) mod grpc; +pub mod proto; diff --git a/tonic-xds/src/testutil/proto/mod.rs b/tonic-xds/src/testutil/proto/mod.rs index 9f75573ad..4b01ca528 100644 --- a/tonic-xds/src/testutil/proto/mod.rs +++ b/tonic-xds/src/testutil/proto/mod.rs @@ -1,6 +1,5 @@ //! This module contains Protobuf definitions for tests. //! To regenerate, run `cargo run -p tonic-xds --example gen_test_proto`. -#[cfg(test)] -#[allow(unreachable_pub, missing_docs)] -pub(crate) mod helloworld; +#[allow(unreachable_pub, missing_docs, missing_debug_implementations)] +pub mod helloworld; diff --git a/tonic-xds/src/xds/bootstrap.rs b/tonic-xds/src/xds/bootstrap.rs index 4f63e6534..8d08acac0 100644 --- a/tonic-xds/src/xds/bootstrap.rs +++ b/tonic-xds/src/xds/bootstrap.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] //! xDS bootstrap configuration. //! //! Parses the bootstrap JSON from `GRPC_XDS_BOOTSTRAP` (file path) or @@ -13,19 +12,43 @@ const ENV_BOOTSTRAP_FILE: &str = "GRPC_XDS_BOOTSTRAP"; /// Environment variable containing inline bootstrap JSON. const ENV_BOOTSTRAP_CONFIG: &str = "GRPC_XDS_BOOTSTRAP_CONFIG"; -/// Parsed xDS bootstrap configuration. +/// Parsed xDS bootstrap configuration per [gRFC A27]. +/// +/// The bootstrap tells the xDS client where the management server lives +/// and what identity (node) to present. It is typically loaded from a +/// JSON file or environment variable. +/// +/// # Loading +/// +/// ```rust,no_run +/// use tonic_xds::BootstrapConfig; +/// +/// // From environment variable (GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG): +/// let config = BootstrapConfig::from_env().unwrap(); +/// +/// // From a JSON string: +/// let json = r#"{"xds_servers":[{"server_uri":"xds.example.com:443"}]}"#; +/// let config = BootstrapConfig::from_json(json).unwrap(); +/// ``` +/// +/// [gRFC A27]: https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md +// TODO: Design a public builder API for constructing BootstrapConfig +// programmatically (not just from JSON). The current `new()` is pub(crate); +// a public API should use the builder pattern to accommodate future fields +// without breaking changes. #[derive(Debug, Clone, Deserialize)] #[non_exhaustive] -pub(crate) struct BootstrapConfig { +pub struct BootstrapConfig { /// xDS management servers to connect to. - pub xds_servers: Vec, + pub(crate) xds_servers: Vec, /// Node identity sent to the xDS server. #[serde(default)] - pub node: NodeConfig, + pub(crate) node: NodeConfig, } /// Configuration for a single xDS management server. #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // Fields consumed when TLS support is added (A29). pub(crate) struct XdsServerConfig { /// URI of the xDS server (e.g., `"xds.example.com:443"`). pub server_uri: String, @@ -39,6 +62,7 @@ pub(crate) struct XdsServerConfig { /// A channel credential entry from the bootstrap config. #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] // Used when TLS support is added (A29). pub(crate) struct ChannelCredentialConfig { /// Credential type (e.g., `"insecure"`, `"tls"`, `"google_default"`). #[serde(rename = "type")] @@ -83,22 +107,29 @@ pub(crate) struct LocalityConfig { /// Errors that can occur when loading bootstrap configuration. #[derive(Debug, thiserror::Error)] -pub(crate) enum BootstrapError { +pub enum BootstrapError { + /// Neither `GRPC_XDS_BOOTSTRAP` nor `GRPC_XDS_BOOTSTRAP_CONFIG` is set. #[error("neither {ENV_BOOTSTRAP_FILE} nor {ENV_BOOTSTRAP_CONFIG} environment variable is set")] NotConfigured, + /// Failed to read the bootstrap JSON file. #[error("failed to read bootstrap file '{path}': {source}")] ReadFile { + /// Path that could not be read. path: String, + /// Underlying I/O error. source: std::io::Error, }, + /// The JSON could not be parsed. #[error("failed to parse bootstrap JSON: {0}")] InvalidJson(#[from] serde_json::Error), + /// The parsed config failed validation (e.g., empty `xds_servers`). #[error("bootstrap config validation failed: {0}")] Validation(String), } impl BootstrapConfig { /// Create a bootstrap configuration directly from struct fields. + #[allow(dead_code)] // Used by callers constructing config programmatically. pub(crate) fn new( xds_servers: Vec, node: NodeConfig, @@ -112,7 +143,7 @@ impl BootstrapConfig { /// /// Checks `GRPC_XDS_BOOTSTRAP` (file path) first, then falls back to /// `GRPC_XDS_BOOTSTRAP_CONFIG` (inline JSON). - pub(crate) fn from_env() -> Result { + pub fn from_env() -> Result { if let Ok(path) = std::env::var(ENV_BOOTSTRAP_FILE) { let json = std::fs::read_to_string(&path) .map_err(|e| BootstrapError::ReadFile { path, source: e })?; @@ -127,7 +158,7 @@ impl BootstrapConfig { } /// Parse bootstrap configuration from a JSON string. - pub(crate) fn from_json(json: &str) -> Result { + pub fn from_json(json: &str) -> Result { let config: BootstrapConfig = serde_json::from_str(json)?; config.validate()?; Ok(config) @@ -161,6 +192,7 @@ impl BootstrapConfig { /// /// Per gRFC A27, the client stops at the first credential type it supports. /// Returns `None` if no supported credential type is found. + #[allow(dead_code)] // Used when TLS support is added (A29). pub(crate) fn selected_credential(&self) -> Option<&ChannelCredentialType> { self.xds_servers .first()? diff --git a/tonic-xds/src/xds/cache.rs b/tonic-xds/src/xds/cache.rs index 2290479ca..e6819681b 100644 --- a/tonic-xds/src/xds/cache.rs +++ b/tonic-xds/src/xds/cache.rs @@ -141,6 +141,7 @@ impl XdsCache { } /// Watches cluster resource changes for a specific cluster. + #[allow(dead_code)] // Will be used when LB policy dispatch is wired (A48). pub(crate) fn watch_cluster(&self, name: &str) -> CacheWatch { self.clusters.watch(name) } @@ -164,7 +165,7 @@ impl XdsCache { #[cfg(test)] mod tests { use super::*; - use crate::xds::resource::LbPolicy; + use crate::xds::resource::cluster::LbPolicy; fn make_route_config(name: &str) -> Arc { use crate::xds::resource::route_config::{ diff --git a/tonic-xds/src/xds/cluster_discovery.rs b/tonic-xds/src/xds/cluster_discovery.rs new file mode 100644 index 000000000..b28005061 --- /dev/null +++ b/tonic-xds/src/xds/cluster_discovery.rs @@ -0,0 +1,63 @@ +//! xDS-backed [`ClusterDiscovery`] implementation. +//! +//! Bridges [`XdsCache`] endpoint watches and [`EndpointManager`] diffing +//! to provide the [`ClusterDiscovery`] trait required by [`XdsLbService`]. + +use std::sync::Arc; + +use tonic::transport::{Channel, Endpoint}; + +use crate::client::endpoint::{EndpointAddress, EndpointChannel}; +use crate::client::lb::{BoxDiscover, ClusterDiscovery}; +use crate::xds::cache::XdsCache; +use crate::xds::endpoint_manager::EndpointManager; + +/// Shared connector function that creates endpoint services from addresses. +// TODO: Refactor to a trait when adding TLS support (A29). A trait can carry +// configuration (TLS settings, timeouts) and be shared across EndpointManager, +// ClusterDiscovery, and LB reconnect logic. +pub(crate) type EndpointConnector = + Arc EndpointChannel + Send + Sync>; + +/// xDS-backed cluster discovery that resolves cluster names into endpoint +/// change streams by watching the [`XdsCache`]. +pub(crate) struct XdsClusterDiscovery { + cache: Arc, + endpoint_manager: EndpointManager>, +} + +impl XdsClusterDiscovery { + /// Creates a new `XdsClusterDiscovery`. + pub(crate) fn new(cache: Arc, connector: EndpointConnector) -> Self { + Self { + cache, + endpoint_manager: EndpointManager::new(connector), + } + } +} + +impl ClusterDiscovery> for XdsClusterDiscovery { + fn discover_cluster( + &self, + cluster_name: &str, + ) -> BoxDiscover> { + let watch = self.cache.watch_endpoints(cluster_name); + self.endpoint_manager.discover_endpoints(watch) + } +} + +/// Default connector that creates a lazily-connected [`EndpointChannel`] for +/// each endpoint address. +/// +/// Uses insecure (plaintext) connections. TLS support will be added as part +/// of gRFC A29. +pub(crate) fn default_endpoint_connector(addr: &EndpointAddress) -> EndpointChannel { + let uri = format!("http://{addr}"); + // Safety: EndpointAddress only holds validated Ipv4/Ipv6/Hostname + u16 port, + // and its Display impl produces "ip:port" or "hostname:port". Prefixing with + // "http://" always yields a valid URI, so from_shared cannot fail here. + let channel = Endpoint::from_shared(uri) + .expect("EndpointAddress Display guarantees valid URI") + .connect_lazy(); + EndpointChannel::new(channel) +} diff --git a/tonic-xds/src/xds/mod.rs b/tonic-xds/src/xds/mod.rs index f342275a0..c430c1730 100644 --- a/tonic-xds/src/xds/mod.rs +++ b/tonic-xds/src/xds/mod.rs @@ -1,15 +1,8 @@ pub(crate) mod bootstrap; -// TODO: remove dead_code once cache is wired into the client layer -#[allow(dead_code)] pub(crate) mod cache; -// TODO: remove dead_code once resource_manager is wired into the channel builder -#[allow(dead_code)] +pub(crate) mod cluster_discovery; pub(crate) mod endpoint_manager; pub(crate) mod resource; -// TODO: remove dead_code once resource_manager is wired into the channel builder -#[allow(dead_code)] pub(crate) mod resource_manager; -// TODO: remove dead_code once routing is wired into the client layer -#[allow(dead_code)] pub(crate) mod routing; pub(crate) mod uri; diff --git a/tonic-xds/src/xds/resource/mod.rs b/tonic-xds/src/xds/resource/mod.rs index 673c80ea4..fcb3a9295 100644 --- a/tonic-xds/src/xds/resource/mod.rs +++ b/tonic-xds/src/xds/resource/mod.rs @@ -1,4 +1,5 @@ -#![allow(dead_code, unused_imports)] +// TODO: remove once A48 (least-request LB) and priority LB consume all fields. +#![allow(dead_code)] //! xDS resource type implementations. //! //! Each module implements [`xds_client::Resource`] for one of the four resource types: @@ -14,7 +15,7 @@ pub(crate) mod endpoints; pub(crate) mod listener; pub(crate) mod route_config; -pub(crate) use cluster::{ClusterResource, LbPolicy}; -pub(crate) use endpoints::{EndpointsResource, LocalityEndpoints, ResolvedEndpoint}; +pub(crate) use cluster::ClusterResource; +pub(crate) use endpoints::EndpointsResource; pub(crate) use listener::ListenerResource; -pub(crate) use route_config::{RouteConfigResource, VirtualHostConfig}; +pub(crate) use route_config::RouteConfigResource; diff --git a/tonic-xds/src/xds/resource/route_config.rs b/tonic-xds/src/xds/resource/route_config.rs index 777535d56..88e3f023c 100644 --- a/tonic-xds/src/xds/resource/route_config.rs +++ b/tonic-xds/src/xds/resource/route_config.rs @@ -269,10 +269,16 @@ fn validate_header_matcher( use envoy_types::pb::envoy::r#type::matcher::v3::string_matcher::MatchPattern; let match_specifier = match hm.header_match_specifier { + // TODO: Remove this arm once ExactMatch is fully removed from envoy-types. + // ExactMatch is deprecated in favor of StringMatch, which is handled below. + #[allow(deprecated)] Some(HeaderMatchSpecifier::ExactMatch(v)) => HeaderMatchSpecifierConfig::Exact { value: v, ignore_case: false, }, + // TODO: Remove this arm once SafeRegexMatch is fully removed from envoy-types. + // SafeRegexMatch is deprecated in favor of StringMatch, which is handled below. + #[allow(deprecated)] Some(HeaderMatchSpecifier::SafeRegexMatch(r)) => { let re = Regex::new(&r.regex).map_err(|e| { Error::Validation(format!("invalid header regex '{}': {e}", r.regex)) diff --git a/tonic-xds/src/xds/routing.rs b/tonic-xds/src/xds/routing.rs index 0fa7176e3..ef2f911d8 100644 --- a/tonic-xds/src/xds/routing.rs +++ b/tonic-xds/src/xds/routing.rs @@ -14,8 +14,10 @@ use std::cmp::Reverse; use std::sync::Arc; +use std::time::Duration; use arc_swap::ArcSwapOption; +use tokio::sync::watch; use crate::client::route::{RouteDecision, RouteInput, Router}; use crate::common::async_util::{AbortOnDrop, BoxFuture}; @@ -25,13 +27,22 @@ use crate::xds::resource::route_config::{ RouteConfigAction, RouteConfigMatch, RouteConfigResource, VirtualHostConfig, WeightedCluster, }; +/// Default timeout for waiting for the initial route config (matches gRFC A57 +/// resource initial timeout). +const DEFAULT_READY_TIMEOUT: Duration = Duration::from_secs(30); + /// xDS-backed [`Router`] that resolves requests to cluster names. /// /// Subscribes to route config changes from [`XdsCache`] via a background watch task /// and maintains a shared [`ArcSwapOption`] for lock-free reads on the hot path. /// The watch task is aborted when the router is dropped. +/// +/// The first RPC blocks (up to [`DEFAULT_READY_TIMEOUT`]) until the initial route +/// config is available, matching standard gRPC behavior where RPCs wait for the +/// resolver's first update. Subsequent RPCs read the config lock-free. pub(crate) struct XdsRouter { route_config: Arc>, + ready_rx: watch::Receiver, _watch_task: AbortOnDrop, } @@ -43,15 +54,22 @@ impl XdsRouter { /// is dropped. pub(crate) fn new(cache: &XdsCache) -> Self { let route_config = Arc::new(ArcSwapOption::empty()); + let (ready_tx, ready_rx) = watch::channel(false); let rc = route_config.clone(); let mut watcher = cache.watch_route_config(); let handle = tokio::spawn(async move { + let mut ready_tx = Some(ready_tx); while let Some(config) = watcher.next().await { rc.store(Some(config)); + // Signal readiness on the first config, then drop the sender. + if let Some(tx) = ready_tx.take() { + let _ = tx.send(true); + } } }); Self { route_config, + ready_rx, _watch_task: AbortOnDrop(handle), } } @@ -59,27 +77,50 @@ impl XdsRouter { impl Router for XdsRouter { fn route(&self, input: &RouteInput<'_>) -> BoxFuture> { - let route_config = self.route_config.load_full(); let authority = input.authority.to_string(); let headers = input.headers.clone(); + + // Fast path: config already available, no cloning needed. + if let Some(rc) = self.route_config.load_full() { + return Box::pin(async move { resolve_route(&rc, &authority, &headers) }); + } + + // Slow path: wait for the initial route config, matching standard + // gRPC behavior where RPCs block until the resolver provides the + // first update. + let route_config_ref = self.route_config.clone(); + let mut ready_rx = self.ready_rx.clone(); Box::pin(async move { - let rc = route_config.ok_or(RoutingError::NotReady)?; - let path = headers - .get(":path") - .and_then(|v| v.to_str().ok()) - .unwrap_or("/"); - let action = rc.route(&authority, path, &headers)?; - let cluster = match action { - RouteConfigAction::Cluster(name) => name.clone(), - RouteConfigAction::WeightedClusters(clusters) => select_weighted_cluster(clusters) - .ok_or(RoutingError::EmptyWeightedClusters)? - .to_string(), - }; - Ok(RouteDecision { cluster }) + tokio::time::timeout(DEFAULT_READY_TIMEOUT, ready_rx.wait_for(|ready| *ready)) + .await + .map_err(|_| RoutingError::NotReady)? + .map_err(|_| RoutingError::NotReady)?; + let rc = route_config_ref.load_full().ok_or(RoutingError::NotReady)?; + resolve_route(&rc, &authority, &headers) }) } } +/// Resolve a route decision from the given config, authority, and headers. +fn resolve_route( + rc: &RouteConfigResource, + authority: &str, + headers: &http::HeaderMap, +) -> Result { + let path = headers + .get(":path") + .and_then(|v| v.to_str().ok()) + .unwrap_or("/"); + let action = rc.route(authority, path, headers)?; + let cluster = match action { + RouteConfigAction::Cluster(name) => name.clone(), + RouteConfigAction::WeightedClusters(clusters) => select_weighted_cluster(clusters) + .ok_or(RoutingError::EmptyWeightedClusters)? + .to_string(), + }; + Ok(RouteDecision { cluster }) +} + /// Error returned when routing fails. #[derive(Debug, Clone, thiserror::Error)] pub(crate) enum RoutingError { @@ -1074,7 +1115,16 @@ mod tests { authority: "svc", headers: &headers, }; - let err = router.route(&input).await.unwrap_err(); - assert!(matches!(err, RoutingError::NotReady)); + // The router now blocks waiting for config; verify it returns + // NotReady after the timeout elapses. + let result = + tokio::time::timeout(std::time::Duration::from_millis(100), router.route(&input)).await; + // Either the inner timeout fires (NotReady) or the outer timeout + // fires (config never arrived) — both are correct. + match result { + Ok(Err(RoutingError::NotReady)) => {} + Err(_elapsed) => {} + other => panic!("expected NotReady or timeout, got {other:?}"), + } } } diff --git a/xds-client/src/resource/mod.rs b/xds-client/src/resource/mod.rs index 953ccafab..e0bd9ab63 100644 --- a/xds-client/src/resource/mod.rs +++ b/xds-client/src/resource/mod.rs @@ -81,7 +81,7 @@ pub enum DecodeResult { /// 2. **Validation**: Transform the message into the final resource type. /// If this fails, the resource name is known ([`DecodeResult::ResourceError`]). /// -/// The provided [`decode`](Self::decode) method orchestrates these phases and +/// The provided `decode` function orchestrates these phases and /// returns the appropriate [`DecodeResult`]. /// /// # Resource Deletion in State of the World (SotW) diff --git a/xds-client/src/transport/tonic.rs b/xds-client/src/transport/tonic.rs index 5ec46fe1f..faf379313 100644 --- a/xds-client/src/transport/tonic.rs +++ b/xds-client/src/transport/tonic.rs @@ -123,8 +123,8 @@ impl TonicTransport { /// Builder for creating [`TonicTransport`] instances. /// -/// This implements [`TransportBuilder`] and can be used with [`XdsClientBuilder`] -/// to enable server fallback support. +/// This implements [`TransportBuilder`] and can be used with +/// [`XdsClientBuilder`](crate::XdsClientBuilder) to enable server fallback support. /// /// For connections requiring TLS or custom channel configuration, see the /// example in [`TonicTransport::from_channel`].