risingwave_meta/rpc/
intercept.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use futures::Future;
19use tonic::body::BoxBody;
20use tower::{Layer, Service};
21
22use crate::rpc::metrics::MetaMetrics;
23
24#[derive(Clone)]
25pub struct MetricsMiddlewareLayer {
26    metrics: Arc<MetaMetrics>,
27}
28
29impl MetricsMiddlewareLayer {
30    pub fn new(metrics: Arc<MetaMetrics>) -> Self {
31        Self { metrics }
32    }
33}
34
35impl<S> Layer<S> for MetricsMiddlewareLayer {
36    type Service = MetricsMiddleware<S>;
37
38    fn layer(&self, service: S) -> Self::Service {
39        MetricsMiddleware {
40            inner: service,
41            metrics: self.metrics.clone(),
42        }
43    }
44}
45
46#[derive(Clone)]
47pub struct MetricsMiddleware<S> {
48    inner: S,
49    metrics: Arc<MetaMetrics>,
50}
51
52impl<S> Service<http::Request<BoxBody>> for MetricsMiddleware<S>
53where
54    S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
55    S::Future: Send + 'static,
56{
57    type Error = S::Error;
58    type Response = S::Response;
59
60    type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
61
62    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63        self.inner.poll_ready(cx)
64    }
65
66    fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
67        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
68        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
69        // for details on why this is necessary
70        let clone = self.inner.clone();
71        let mut inner = std::mem::replace(&mut self.inner, clone);
72
73        let metrics = self.metrics.clone();
74
75        async move {
76            let path = req.uri().path();
77            let timer = metrics
78                .grpc_latency
79                .with_label_values(&[path])
80                .start_timer();
81
82            let response = inner.call(req).await?;
83
84            timer.observe_duration();
85
86            Ok(response)
87        }
88    }
89}