risingwave_common_service/
tracing.rs1use std::task::{Context, Poll};
16
17use futures::Future;
18use risingwave_common::util::tracing::TracingContext;
19use tonic::body::BoxBody;
20use tower::{Layer, Service};
21use tracing::Instrument;
22
23#[derive(Clone, Default)]
25pub struct TracingExtractLayer {
26 _private: (),
27}
28
29impl TracingExtractLayer {
30 pub fn new() -> Self {
31 Self::default()
32 }
33}
34
35impl<S> Layer<S> for TracingExtractLayer {
36 type Service = TracingExtract<S>;
37
38 fn layer(&self, service: S) -> Self::Service {
39 TracingExtract { inner: service }
40 }
41}
42
43#[derive(Clone)]
48pub struct TracingExtract<S> {
49 inner: S,
50}
51
52impl<S> Service<http::Request<BoxBody>> for TracingExtract<S>
53where
54 S: Service<http::Request<BoxBody>> + Clone + Send + 'static,
55 S::Future: Send + 'static,
56{
57 type Error = S::Error;
58 type Response = S::Response;
59
60 type Future = impl Future<Output = Result<Self::Response, Self::Error>>;
61
62 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63 self.inner.poll_ready(cx)
64 }
65
66 fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
67 let clone = self.inner.clone();
71 let mut inner = std::mem::replace(&mut self.inner, clone);
72
73 async move {
74 let span = match TracingContext::from_http_headers(req.headers()) {
75 Some(tracing_context) => {
76 let span = tracing::info_span!(
77 "grpc_serve",
78 "otel.name" = req.uri().path(),
79 uri = %req.uri()
80 );
81 tracing_context.attach(span)
82 }
83 _ => {
84 tracing::Span::none() }
86 };
87
88 inner.call(req).instrument(span).await
89 }
90 }
91}