refactor(grpc): Use gRPC metadata instead of tonic metadata#2629
Conversation
| let trailers = match md.map(TryInto::try_into) { | ||
| Some(Err(e)) => Trailers::new(Err(StatusError::new( | ||
| StatusCodeError::Internal, | ||
| format!("failed to parse metadata: {e}"), | ||
| ))), | ||
| Some(Ok(metadata)) => Trailers::new(status_res).with_metadata(metadata), | ||
| None => Trailers::new(status_res), | ||
| }; | ||
|
|
||
| ClientResponseStreamItem::Trailers(trailers) |
There was a problem hiding this comment.
Can this just call trailers_from_status to do all of this part?
There was a problem hiding this comment.
Yes, that removed the redundant error handling. Updated.
| // TODO: in this case, tonic believes the stream is | ||
| // still running, but our parsing failed -- do we need | ||
| // to terminate the request stream now even though the | ||
| // Streaming is dropped? |
There was a problem hiding this comment.
Let's answer this before merging this PR if we can.
There was a problem hiding this comment.
I initially assumed that either Tonic, Hyper, or h2 would automatically close the underlying HTTP/2 stream when the client drops the inbound stream. However, this is not the case.
I verified this by modifying the route_chat client in Tonic's routeguide example and running a Go gRPC server to log all HTTP/2 frames:
Modified route_chat client
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
loop {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = RouteNote {
location: Some(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
};
println!("Sending message: {:?}", note.clone());
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
// Drop the inbound stream immediately
drop(inbound);
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
Ok(())
}Despite dropping inbound, the outbound stream continued to be polled by Hyper/h2, and the Go server continued logging incoming DATA frames.
I updated the Tonic transport to explicitly close the outbound stream when it fails to decode response messages or headers on the inbound stream. With this change, the Go server receives a RST_STREAM frame with code CANCELLED once decoding fails on the client.
To implement this without introducing a mutex or spawning an additional background task, I used the take_until stream combinator from the futures crate.
Update: Also ensured dropping TonicRecvStream results in stream cancellation.
| // stream to yield `None`, which tells Tonic to cancel the stream. | ||
| let stop_notify_clone = stop_notify.clone(); | ||
| let request_stream = ReceiverStream::new(req_rx) | ||
| .take_until(Box::pin(async move { stop_notify_clone.notified().await })); |
There was a problem hiding this comment.
Would this work?
.take_until(Box::pin(stop_notify_clone.notified()));There was a problem hiding this comment.
Because notified() takes &self, the async block was originally needed to take ownership of the Notify instance and ensure it lived long enough. Tokio's notified_owned() method handles this by taking ownership directly, which cleanly removes the need for the async wrapper. I've updated the code to use it, and since Box::pin was also unnecessary, I've removed that as well.
|
|
||
| impl Drop for TonicRecvStream { | ||
| fn drop(&mut self) { | ||
| if let Some(notify) = self.stop_notify.take() { |
There was a problem hiding this comment.
I believe this take is unnecessary, but it's probably not significant so feel free to not change it.
There was a problem hiding this comment.
Yes, removed the take.
This PR migrates the gRPC crates to use the gRPC
MetadataMapintroduced in #2567, replacing the dependency ontonictypes.This PR also includes the following fixes:
http::HeaderMapto the gRPCMetadataMapnow fails explicitly when encountering invalid base64-encoded binary metadata, rather than silently dropping the values. Thetonictransport now returns anINTERNALstatus in these cases to ensure consistency with other gRPC implementations.RecvStream.