risingwave_common_service/
tracing.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::task::{Context, Poll};
16
17use futures::Future;
18use risingwave_common::util::tracing::TracingContext;
19use tonic::body::BoxBody;
20use tower::{Layer, Service};
21use tracing::Instrument;
22
23/// A layer that decorates the inner service with [`TracingExtract`].
24#[derive(Clone, Default)]
25pub struct TracingExtractLayer {
26    _private: (),
27}
28
29impl TracingExtractLayer {
30    pub fn new() -> Self {
31        Self::default()
32    }
33}
34
35impl<S> Layer<S> for TracingExtractLayer {
36    type Service = TracingExtract<S>;
37
38    fn layer(&self, service: S) -> Self::Service {
39        TracingExtract { inner: service }
40    }
41}
42
43/// A service wrapper that extracts the [`TracingContext`] from the HTTP headers and uses it to
44/// create a new tracing span for the request handler, if one exists.
45///
46/// See also `TracingInject` in the `rpc_client` crate.
47#[derive(Clone)]
48pub struct TracingExtract<S> {
49    inner: S,
50}
51
52impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
53where
54    S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
55    S::Future: Send + 'static,
56{
57    type Error = S::Error;
58    type Response = S::Response;
59
60    type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
61
62    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63        self.inner.poll_ready(cx)
64    }
65
66    fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
67        // This is necessary because tonic internally uses `tower::buffer::Buffer`.
68        // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
69        // for details on why this is necessary
70        let clone = self.inner.clone();
71        let mut inner = std::mem::replace(&mut self.inner, clone);
72
73        async move {
74            let span = match TracingContext::from_http_headers(req.headers()) {
75                Some(tracing_context) => {
76                    let span = tracing::info_span!(
77                        "grpc_serve",
78                        "otel.name" = req.uri().path(),
79                        uri = %req.uri()
80                    );
81                    tracing_context.attach(span)
82                }
83                _ => {
84                    tracing::Span::none() // if there's no parent span, disable tracing for this request
85                }
86            };
87
88            inner.call(req).instrument(span).await
89        }
90    }
91}