risingwave_meta/rpc/
intercept.rs1use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use futures::Future;
19use tonic::body::BoxBody;
20use tower::{Layer, Service};
21
22use crate::rpc::metrics::MetaMetrics;
23
24#[derive(Clone)]
25pub struct MetricsMiddlewareLayer {
26 metrics: Arc<MetaMetrics>,
27}
28
29impl MetricsMiddlewareLayer {
30 pub fn new(metrics: Arc<MetaMetrics>) -> Self {
31 Self { metrics }
32 }
33}
34
35impl<S> Layer<S> for MetricsMiddlewareLayer {
36 type Service = MetricsMiddleware<S>;
37
38 fn layer(&self, service: S) -> Self::Service {
39 MetricsMiddleware {
40 inner: service,
41 metrics: self.metrics.clone(),
42 }
43 }
44}
45
46#[derive(Clone)]
47pub struct MetricsMiddleware<S> {
48 inner: S,
49 metrics: Arc<MetaMetrics>,
50}
51
52impl<S> Service<http::Request<BoxBody>> for MetricsMiddleware<S>
53where
54 S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
55 S::Future: Send + 'static,
56{
57 type Error = S::Error;
58 type Response = S::Response;
59
60 type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
61
62 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63 self.inner.poll_ready(cx)
64 }
65
66 fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
67 let clone = self.inner.clone();
71 let mut inner = std::mem::replace(&mut self.inner, clone);
72
73 let metrics = self.metrics.clone();
74
75 async move {
76 let path = req.uri().path();
77 let timer = metrics
78 .grpc_latency
79 .with_label_values(&[path])
80 .start_timer();
81
82 let response = inner.call(req).await?;
83
84 timer.observe_duration();
85
86 Ok(response)
87 }
88 }
89}