forked from hyperium/tonic
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathendpoint.rs
More file actions
172 lines (152 loc) · 4.78 KB
/
endpoint.rs
File metadata and controls
172 lines (152 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
use crate::common::async_util::BoxFuture;
use std::net::SocketAddr;
use std::sync::{Arc, atomic::AtomicU64, atomic::Ordering};
use std::task::{Context, Poll};
use tower::{Service, load::Load};
/// Represents the host part of an endpoint address
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum EndpointHost {
Ipv4(std::net::Ipv4Addr),
Ipv6(std::net::Ipv6Addr),
Hostname(String),
}
impl From<String> for EndpointHost {
fn from(s: String) -> Self {
if let Ok(ipv4) = s.parse::<std::net::Ipv4Addr>() {
EndpointHost::Ipv4(ipv4)
} else if let Ok(ipv6) = s.parse::<std::net::Ipv6Addr>() {
EndpointHost::Ipv6(ipv6)
} else {
EndpointHost::Hostname(s)
}
}
}
/// Represents a validated endpoint address extracted from xDS
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct EndpointAddress {
/// The IP address or hostname
host: EndpointHost,
/// The port number
port: u16,
}
impl EndpointAddress {
/// Creates a new `EndpointAddress` from a host string and port.
///
/// Attempts to parse the host as an IP address; falls back to hostname.
#[allow(dead_code)]
pub(crate) fn new(host: impl Into<String>, port: u16) -> Self {
Self {
host: EndpointHost::from(host.into()),
port,
}
}
}
impl std::fmt::Display for EndpointAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.host {
EndpointHost::Ipv4(ip) => write!(f, "{ip}:{}", self.port),
EndpointHost::Ipv6(ip) => write!(f, "[{ip}]:{}", self.port),
EndpointHost::Hostname(h) => write!(f, "{h}:{}", self.port),
}
}
}
impl From<SocketAddr> for EndpointAddress {
fn from(addr: SocketAddr) -> Self {
match addr {
SocketAddr::V4(v4_addr) => Self {
host: EndpointHost::Ipv4(*v4_addr.ip()),
port: v4_addr.port(),
},
SocketAddr::V6(v6_addr) => Self {
host: EndpointHost::Ipv6(*v6_addr.ip()),
port: v6_addr.port(),
},
}
}
}
/// RAII tracker for in-flight requests.
/// This is mainly used to implement endpoint load reporting for load balancing purposes.
#[derive(Clone, Debug, Default)]
struct InFlightTracker {
in_flight: Arc<AtomicU64>,
}
impl InFlightTracker {
fn new(in_flight: Arc<AtomicU64>) -> Self {
in_flight.fetch_add(1, Ordering::Relaxed);
Self { in_flight }
}
}
impl Drop for InFlightTracker {
fn drop(&mut self) {
self.in_flight.fetch_sub(1, Ordering::Relaxed);
}
}
/// An endpoint channel for communicating with a single gRPC endpoint, with load reporting support for load balancing.
pub(crate) struct EndpointChannel<S> {
inner: S,
in_flight: Arc<AtomicU64>,
}
impl<S> EndpointChannel<S> {
/// Creates a new `EndpointChannel`.
/// This should be used by xDS implementations to construct channels to individual endpoints.
#[allow(dead_code)]
pub(crate) fn new(inner: S) -> Self {
Self {
inner,
in_flight: Arc::new(AtomicU64::new(0)),
}
}
}
impl<S> Clone for EndpointChannel<S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
in_flight: self.in_flight.clone(),
}
}
}
impl<S, Req> Service<Req> for EndpointChannel<S>
where
S: Service<Req> + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<Result<S::Response, S::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Req) -> Self::Future {
let in_flight = InFlightTracker::new(self.in_flight.clone());
let fut = self.inner.call(req);
// -1 when the inner future completes
Box::pin(async move {
let _in_flight_guard = in_flight;
fut.await
})
}
}
impl<S> Load for EndpointChannel<S> {
type Metric = u64;
fn load(&self) -> Self::Metric {
self.in_flight.load(Ordering::Relaxed)
}
}
/// Factory for creating connections to endpoints.
///
/// Implementations capture cluster-level config (TLS, HTTP/2 settings, timeouts)
/// at construction time. The implementation handles retries and concurrency
/// internally — the returned future resolves when a connection is established
/// (or is cancelled by dropping).
pub(crate) trait Connector {
/// The service type produced by this connector.
type Service;
/// Connect to the given endpoint address.
fn connect(
&self,
addr: &EndpointAddress,
) -> crate::common::async_util::BoxFuture<Self::Service>;
}