risingwave_common/util/
tracing.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod layer;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use opentelemetry::propagation::TextMapPropagator;
22use opentelemetry_sdk::propagation::TraceContextPropagator;
23use tracing_opentelemetry::OpenTelemetrySpanExt;
24
25/// Context for tracing used for propagating tracing information in a distributed system.
26///
27/// Generally, the caller of a service should create a tracing context from the current tracing span
28/// and pass it to the callee through the network. The callee will then attach its local tracing
29/// span as a child of the tracing context, so that the external tracing service can associate them
30/// in a single trace.
31///
32/// The tracing context must be serialized and deserialized when passing through the network.
33/// There're two ways to do this:
34///
35/// - For RPC calls with clear caller/callee relationship, the tracing context can be serialized
36///   into the W3C trace context format and passed in HTTP headers seamlessly with a service
37///   middleware. For example, the DDL requests from the frontend to the meta service.
38/// - For RPC calls with no clear caller/callee relationship or with asynchronous semantics, the
39///   tracing context should be passed manually as a protobuf field in the request and response for
40///   better lifetime management. For example, the exchange services between streaming actors or
41///   batch stages, and the `create_task` requests from the frontend to the batch service.
42///
43/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
44#[derive(Debug, Clone)]
45pub struct TracingContext(opentelemetry::Context);
46
47type Propagator = TraceContextPropagator;
48
49impl TracingContext {
50    /// Create a new tracing context from a tracing span.
51    pub fn from_span(span: &tracing::Span) -> Self {
52        Self(span.context())
53    }
54
55    /// Create a new tracing context from the current tracing span considered by the subscriber.
56    pub fn from_current_span() -> Self {
57        Self::from_span(&tracing::Span::current())
58    }
59
60    /// Create a no-op tracing context.
61    pub fn none() -> Self {
62        Self(opentelemetry::Context::new())
63    }
64
65    /// Attach the given span as a child of the context. Returns the attached span.
66    pub fn attach(&self, span: tracing::Span) -> tracing::Span {
67        span.set_parent(self.0.clone());
68        span
69    }
70
71    /// Convert the tracing context to the W3C trace context format.
72    fn to_w3c(&self) -> HashMap<String, String> {
73        let mut fields = HashMap::new();
74        Propagator::new().inject_context(&self.0, &mut fields);
75        fields
76    }
77
78    /// Create a new tracing context from the W3C trace context format.
79    fn from_w3c(fields: &HashMap<String, String>) -> Self {
80        let context = Propagator::new().extract(fields);
81        Self(context)
82    }
83
84    /// Convert the tracing context to the protobuf format.
85    pub fn to_protobuf(&self) -> HashMap<String, String> {
86        self.to_w3c()
87    }
88
89    /// Create a new tracing context from the protobuf format.
90    pub fn from_protobuf(fields: &HashMap<String, String>) -> Self {
91        Self::from_w3c(fields)
92    }
93
94    /// Convert the tracing context to the W3C trace context format in HTTP headers.
95    pub fn to_http_headers(&self) -> http::HeaderMap {
96        let map = self.to_w3c();
97        http::HeaderMap::try_from(&map).unwrap_or_default()
98    }
99
100    /// Create a new tracing context from the W3C trace context format in HTTP headers.
101    ///
102    /// Returns `None` if the headers are invalid.
103    pub fn from_http_headers(headers: &http::HeaderMap) -> Option<Self> {
104        let mut map = HashMap::new();
105
106        // See [Trace Context](https://www.w3.org/TR/trace-context/) for these header names.
107        for key in ["traceparent", "tracestate"] {
108            let value = headers.get(key)?.to_str().ok()?;
109            map.insert(key.to_owned(), value.to_owned());
110        }
111
112        Some(Self::from_w3c(&map))
113    }
114}
115
116/// Extension trait allowing [`futures::Stream`]s to be instrumented with a [`tracing::Span`].
117#[easy_ext::ext(InstrumentStream)]
118impl<T> T
119where
120    T: futures::Stream + Sized,
121{
122    /// Instruments the stream with the given span. Alias for
123    /// [`tracing_futures::Instrument::instrument`].
124    ///
125    /// The span will be entered and exited every time the stream is polled. The span will be
126    /// closed only when the stream is dropped.
127    ///
128    /// If the stream is long-lived, consider [`InstrumentStream::instrument_with`] instead to
129    /// avoid accumulating too many events in the span.
130    pub fn instrument(self, span: tracing::Span) -> tracing_futures::Instrumented<Self> {
131        tracing_futures::Instrument::instrument(self, span)
132    }
133
134    /// Instruments the stream with spans created by the given closure **every time an item is
135    /// yielded**.
136    pub fn instrument_with<S>(self, make_span: S) -> WithInstrumented<Self, S>
137    where
138        S: FnMut() -> tracing::Span,
139    {
140        WithInstrumented::new(self, make_span)
141    }
142}
143
144pin_project_lite::pin_project! {
145    /// A [`futures::Stream`] that has been instrumented with
146    /// [`InstrumentStream::instrument_with`].
147    #[derive(Debug, Clone)]
148    pub struct WithInstrumented<T, S> {
149        #[pin]
150        inner: T,
151        make_span: S,
152        current_span: Option<tracing::Span>,
153    }
154}
155
156impl<T, S> WithInstrumented<T, S> {
157    /// Creates a new [`WithInstrumented`] stream.
158    fn new(inner: T, make_span: S) -> Self {
159        Self {
160            inner,
161            make_span,
162            current_span: None,
163        }
164    }
165
166    /// Returns the inner stream.
167    pub fn into_inner(self) -> T {
168        self.inner
169    }
170}
171
172impl<T, S> futures::Stream for WithInstrumented<T, S>
173where
174    T: futures::Stream + Sized,
175    S: FnMut() -> tracing::Span,
176{
177    type Item = T::Item;
178
179    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        let this = self.project();
181
182        let poll = {
183            let _entered = this.current_span.get_or_insert_with(this.make_span).enter();
184            this.inner.poll_next(cx)
185        };
186
187        if poll.is_ready() {
188            // Drop the span when a new item is yielded.
189            *this.current_span = None;
190        }
191
192        poll
193    }
194}