Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions grpc-google/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use grpc::credentials::SecurityLevel;
use grpc::credentials::call::CallCredentials;
use grpc::credentials::call::CallDetails;
use grpc::credentials::call::ClientConnectionSecurityInfo;
use grpc::metadata::AsciiMetadataValue;
use grpc::metadata::MetadataMap;
use tonic::async_trait;
use tonic::metadata::AsciiMetadataValue;
use tonic::metadata::MetadataMap;

const DEFAULT_CLOUD_PLATFORM_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform";

Expand Down Expand Up @@ -162,7 +162,7 @@ mod tests {
assert!(res.is_ok());

let auth_header = metadata.get("authorization").unwrap();
assert_eq!(auth_header.to_str().unwrap(), "Bearer valid_token");
assert_eq!(auth_header.to_str(), "Bearer valid_token");
}

#[tokio::test]
Expand All @@ -184,7 +184,7 @@ mod tests {
async fn non_ascii_token_internal_error() {
let creds = GcpCallCredentials {
provider: MockTokenProvider {
result: Ok("invalid character\n".into()),
result: Ok("invalid\ncharacter".into()),
},
};
let (cd, auth_info) = fake_args();
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/client/load_balancing/graceful_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,6 @@ mod test {
use std::sync::mpsc;
use std::time::Duration;

use tonic::metadata::MetadataMap;

use crate::client::load_balancing::ChannelController;
use crate::client::load_balancing::LbPolicy;
use crate::client::load_balancing::LbState;
Expand All @@ -276,6 +274,7 @@ mod test {
use crate::client::name_resolution::Endpoint;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::default_runtime;

const DEFAULT_TEST_SHORT_TIMEOUT: Duration = Duration::from_millis(10);
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/client/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::sync::Arc;

use tonic::metadata::MetadataMap;

use crate::StatusCodeError;
use crate::StatusError;
use crate::client::ConnectivityState;
Expand All @@ -39,6 +37,7 @@ use crate::client::load_balancing::subchannel::SubchannelState;
use crate::client::name_resolution::Address;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::GrpcRuntime;

pub(crate) mod child_manager;
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/client/load_balancing/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ mod test {
use std::sync::Arc;
use std::sync::mpsc;

use tonic::metadata::MetadataMap;

use crate::StatusCodeError;
use crate::client::ConnectivityState;
use crate::client::load_balancing::ChannelController;
Expand Down Expand Up @@ -286,6 +284,7 @@ mod test {
use crate::client::name_resolution::Endpoint;
use crate::client::name_resolution::ResolverUpdate;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;
use crate::rt::default_runtime;

const DEFAULT_TEST_SHORT_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/client/load_balancing/subchannel_sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,6 @@ mod tests {
use std::sync::Mutex;
use std::sync::mpsc;

use tonic::metadata::MetadataMap;

use super::*;
use crate::client::ConnectivityState;
use crate::client::load_balancing::LbPolicy;
Expand All @@ -309,6 +307,7 @@ mod tests {
use crate::client::load_balancing::test_utils::new_request_headers;
use crate::client::name_resolution::Address;
use crate::client::name_resolution::ResolverUpdate;
use crate::metadata::MetadataMap;
use crate::rt::default_runtime;

fn test_lb_policy_options(tx_events: mpsc::Sender<TestEvent>) -> LbPolicyOptions {
Expand Down
25 changes: 13 additions & 12 deletions grpc/src/client/metadata_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
*/

use tokio::sync::oneshot;
use tonic::metadata::MetadataMap;

use crate::client::CallOptions;
use crate::client::InvokeOnce;
use crate::client::RecvStream;
use crate::client::interceptor::Intercept;
use crate::client::interceptor::InterceptOnce;
use crate::core::RequestHeaders;
use crate::metadata::MetadataMap;

/// An interceptor that attaches metadata to outgoing RPC headers.
pub struct AttachHeadersInterceptor {
Expand All @@ -53,16 +53,16 @@ impl<I: InvokeOnce> Intercept<I> for AttachHeadersInterceptor {
options: CallOptions,
next: I,
) -> (Self::SendStream, Self::RecvStream) {
headers
.metadata_mut()
.as_mut()
.extend(self.md.as_ref().clone());

let md = headers.metadata_mut();
for entry in self.md.iter() {
match entry {
tonic::metadata::KeyAndValueRef::Ascii(k, v) => _ = md.insert(k, v.clone()),
tonic::metadata::KeyAndValueRef::Binary(k, v) => _ = md.insert_bin(k, v.clone()),
let incoming_meta = headers.metadata_mut();
incoming_meta.reserve(self.md.len());
for kv in self.md.iter() {
match kv {
crate::metadata::KeyAndValueRef::Ascii(key, value) => {
incoming_meta.append(key, value.clone())
}
crate::metadata::KeyAndValueRef::Binary(key, value) => {
incoming_meta.append_bin(key, value.clone())
}
}
}
next.invoke_once(headers, options).await
Expand Down Expand Up @@ -179,6 +179,7 @@ mod tests {
use crate::core::ClientResponseStreamItem;
use crate::core::ResponseHeaders;
use crate::core::Trailers;
use crate::metadata::BinaryMetadataValue;

#[tokio::test]
async fn test_attach_headers_interceptor() {
Expand All @@ -187,7 +188,7 @@ mod tests {
md.insert("x-test-header", "test-value".parse().unwrap());
md.insert_bin(
"x-test-header-bin",
tonic::metadata::MetadataValue::from_bytes(b"test-bin"),
BinaryMetadataValue::from_bytes(b"test-bin"),
);
let interceptor = AttachHeadersInterceptor::new(md);

Expand Down
78 changes: 55 additions & 23 deletions grpc/src/client/transport/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use tonic::codec::Codec;
use tonic::codec::Decoder;
use tonic::codec::EncodeBuf;
use tonic::codec::Encoder;
use tonic::metadata::MetadataMap;
use tonic::metadata::MetadataMap as TonicMeta;
use tower::ServiceBuilder;
use tower::buffer::Buffer;
use tower::buffer::future::ResponseFuture as BufferResponseFuture;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl Invoke for TonicTransport {
let request_stream = ReceiverStream::new(req_rx);
let mut request = TonicRequest::new(Box::pin(request_stream));
let (method, metadata) = headers.into_parts();
*request.metadata_mut() = metadata;
*request.metadata_mut() = metadata.into();

let Ok(path) = PathAndQuery::from_maybe_shared(method) else {
return err_streams(StatusError::new(StatusCodeError::Internal, "invalid path"));
Expand Down Expand Up @@ -192,28 +192,38 @@ impl Invoke for TonicTransport {
// Converts from a tonic status to a trailers stream item.
fn trailers_from_tonic_status(
status: TonicStatus,
md: Option<MetadataMap>,
md: Option<TonicMeta>,
) -> ClientResponseStreamItem {
let mut trailers = Trailers::new(if status.code() == Code::Ok {
Ok(())
} else {
Err(StatusError::new(
StatusCodeError::from(status.code() as i32),
let status_res = match status.code() {
Code::Ok => Ok(()),
code => Err(StatusError::new(
StatusCodeError::from(code as i32),
status.message(),
))
});
if let Some(md) = md {
trailers = trailers.with_metadata(md);
}
)),
};

let trailers = match md.map(TryInto::try_into) {
Some(Err(e)) => Trailers::new(Err(StatusError::new(
StatusCodeError::Internal,
format!("failed to parse metadata: {e}"),
))),
Some(Ok(metadata)) => Trailers::new(status_res).with_metadata(metadata),
None => Trailers::new(status_res),
};

ClientResponseStreamItem::Trailers(trailers)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just call trailers_from_status to do all of this part?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that removed the redundant error handling. Updated.

}

// Builds a trailers with a status
fn trailers_from_status(status: Status, md: Option<MetadataMap>) -> ClientResponseStreamItem {
let mut trailers = Trailers::new(status);
if let Some(md) = md {
trailers = trailers.with_metadata(md);
}
fn trailers_from_status(status: Status, md: Option<TonicMeta>) -> ClientResponseStreamItem {
let trailers = match md.map(TryInto::try_into) {
Some(Err(e)) => Trailers::new(Err(StatusError::new(
StatusCodeError::Internal,
format!("failed to parse metadata: {e}"),
))),
Some(Ok(metadata)) => Trailers::new(status).with_metadata(metadata),
None => Trailers::new(status),
};
ClientResponseStreamItem::Trailers(trailers)
}

Expand Down Expand Up @@ -262,11 +272,33 @@ impl RecvStream for TonicRecvStream {
StreamState::AwaitingHeaders(rx) => match rx.await {
Ok(Ok(response)) => {
let (metadata, stream, _extensions) = response.into_parts();
// Start streaming and return the headers.
self.state = StreamState::Streaming(stream);
ClientResponseStreamItem::Headers(
ResponseHeaders::new().with_metadata(metadata),
)
// Tonic decodes base64-encoded binary headers lazily. It
// does not fail the RPC upon receiving invalid base64 data;
// the error only surfaces when the application attempts to
// read the metadata.
// In contrast, standard gRPC implementations eagerly decode
// these headers and immediately fail the RPC with an
// Internal status.
match metadata.try_into() {
Ok(md) => {
// Start streaming and return the headers.
self.state = StreamState::Streaming(stream);
ClientResponseStreamItem::Headers(
ResponseHeaders::new().with_metadata(md),
)
}
// TODO: in this case, tonic believes the stream is
// still running, but our parsing failed -- do we need
// to terminate the request stream now even though the
// Streaming is dropped?
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's answer this before merging this PR if we can.

Copy link
Copy Markdown
Collaborator Author

@arjan-bal arjan-bal May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially assumed that either Tonic, Hyper, or h2 would automatically close the underlying HTTP/2 stream when the client drops the inbound stream. However, this is not the case.

I verified this by modifying the route_chat client in Tonic's routeguide example and running a Go gRPC server to log all HTTP/2 frames:

Modified route_chat client
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();

    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));

        loop {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = RouteNote {
                location: Some(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            };
            println!("Sending message: {:?}", note.clone());

            yield note;
        }
    };

    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    
    // Drop the inbound stream immediately
    drop(inbound);

    tokio::time::sleep(std::time::Duration::from_secs(60)).await;

    Ok(())
}

Despite dropping inbound, the outbound stream continued to be polled by Hyper/h2, and the Go server continued logging incoming DATA frames.


I updated the Tonic transport to explicitly close the outbound stream when it fails to decode response messages or headers on the inbound stream. With this change, the Go server receives a RST_STREAM frame with code CANCELLED once decoding fails on the client.

To implement this without introducing a mutex or spawning an additional background task, I used the take_until stream combinator from the futures crate.

Update: Also ensured dropping TonicRecvStream results in stream cancellation.

Err(e) => trailers_from_status(
Err(StatusError::new(
StatusCodeError::Internal,
format!("error decoding response: {e}"),
)),
None,
),
}
}
// Stay closed after sending trailers.
Err(_) => trailers_from_status(
Expand Down
6 changes: 3 additions & 3 deletions grpc/src/client/transport/tonic/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::Response;
use tonic::async_trait;
use tonic::metadata::MetadataMap;
use tonic::transport::Server;
use tonic_prost::prost::Message as ProstMessage;

Expand Down Expand Up @@ -80,6 +79,8 @@ use crate::echo_pb::EchoRequest;
use crate::echo_pb::EchoResponse;
use crate::echo_pb::echo_server::Echo;
use crate::echo_pb::echo_server::EchoServer;
use crate::metadata::AsciiMetadataKey;
use crate::metadata::MetadataMap;
use crate::rt::GrpcRuntime;
use crate::rt::tokio::TokioRuntime;

Expand All @@ -103,8 +104,7 @@ impl CallCredentials for MockCallCredentials {
}
for (key, val) in &self.metadata {
metadata.insert(
key.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>()
.unwrap(),
key.parse::<AsciiMetadataKey>().unwrap(),
val.parse().unwrap(),
);
}
Expand Down
2 changes: 1 addition & 1 deletion grpc/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
use std::any::TypeId;

use bytes::Buf;
use tonic::metadata::MetadataMap;

use crate::metadata::MetadataMap;
use crate::status::Status;

#[allow(unused)]
Expand Down
12 changes: 5 additions & 7 deletions grpc/src/credentials/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use std::fmt::Debug;
use std::sync::Arc;

use tonic::async_trait;
use tonic::metadata::MetadataMap;

use crate::StatusError;
use crate::attributes::Attributes;
use crate::credentials::SecurityLevel;
use crate::metadata::MetadataMap;

/// Details regarding the call.
///
Expand Down Expand Up @@ -175,9 +175,9 @@ impl CallCredentials for CompositeCallCredentials {

#[cfg(test)]
mod tests {
use tonic::metadata::MetadataValue;

use super::*;
use crate::metadata::AsciiMetadataKey;
use crate::metadata::AsciiMetadataValue;

#[derive(Debug)]
struct MockCallCredentials {
Expand All @@ -195,10 +195,8 @@ mod tests {
metadata: &mut MetadataMap,
) -> Result<(), StatusError> {
metadata.insert(
self.key
.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>()
.unwrap(),
MetadataValue::try_from(&self.value).unwrap(),
self.key.parse::<AsciiMetadataKey>().unwrap(),
AsciiMetadataValue::try_from(&self.value).unwrap(),
);
Ok(())
}
Expand Down
11 changes: 5 additions & 6 deletions grpc/src/credentials/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ impl<T: ChannelCredentials> ChannelCredentials for CompositeChannelCredentials<T
mod tests {
use tokio::net::TcpListener;
use tonic::async_trait;
use tonic::metadata::MetadataMap;
use tonic::metadata::MetadataValue;

use super::*;
use crate::StatusError;
Expand All @@ -213,6 +211,9 @@ mod tests {
use crate::credentials::call::ClientConnectionSecurityInfo;
use crate::credentials::insecure::InsecureChannelCredentials;
use crate::credentials::local::LocalChannelCredentials;
use crate::metadata::AsciiMetadataKey;
use crate::metadata::AsciiMetadataValue;
use crate::metadata::MetadataMap;
use crate::rt;
use crate::rt::TcpOptions;

Expand All @@ -232,10 +233,8 @@ mod tests {
metadata: &mut MetadataMap,
) -> Result<(), StatusError> {
metadata.insert(
self.key
.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>()
.unwrap(),
MetadataValue::try_from(self.value).unwrap(),
self.key.parse::<AsciiMetadataKey>().unwrap(),
AsciiMetadataValue::try_from(self.value).unwrap(),
);
Ok(())
}
Expand Down
Loading
Loading