How to determine active in-flight request/response count with streaming response bodies? #2196
-
Hi, Wider contextI'm working on a generic grpc proxy/gateway (i.e. http2) that adds some rate limiting and authenticating for a few upstream grpc services which return a grpc server stream (client requests something, server responds with infinite grpc stream) . The grpc proxy should be protobuf agnostic (shouldn't need to know about the protos of the upstream services), which kinda rules out using tonic. Concrete problemI'm trying to add monitoring and limiting of active number of streams per api key (requests are authenticated using an api key whose context can be passed into handlers and middleware). So basically I wonder how can I observe from within a middleware when the body stream has ended (due to client hanging up, stream being finished, error encountered, etc.)? Here's how the main response handler roughly looks at the moment btw: #[debug_handler]
pub async fn proxy(
State(state): State<GrpcGatewayState>,
Extension(api_key_record): Extension<api_key_with_user::Data>,
mut request: Request<Body>,
) -> Result<Response<Body>, ApiGatewayError> {
let requested_path_and_query = request.uri().path_and_query().unwrap();
let url = state
.upstream_url
.clone()
.join(requested_path_and_query.as_str())
.unwrap();
println!("proxying call to uri: {}", url);
*request.uri_mut() = Uri::try_from(url.as_str()).unwrap();
let response = state.http2_client.request(request).await?;
Ok(response)
// handler returns here, even though body is still streaming data for minutes afterwards.
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 4 replies
-
You can make your own implementation of the |
Beta Was this translation helpful? Give feedback.
You can make your own implementation of the
Body
trait that wraps another body. The stream is over whenpoll_data
returnsPoll::Ready(None)
.