risingwave_common/util/tracing.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod layer;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use opentelemetry::propagation::TextMapPropagator;
22use opentelemetry_sdk::propagation::TraceContextPropagator;
23use tracing_opentelemetry::OpenTelemetrySpanExt;
24
25/// Context for tracing used for propagating tracing information in a distributed system.
26///
27/// Generally, the caller of a service should create a tracing context from the current tracing span
28/// and pass it to the callee through the network. The callee will then attach its local tracing
29/// span as a child of the tracing context, so that the external tracing service can associate them
30/// in a single trace.
31///
32/// The tracing context must be serialized and deserialized when passing through the network.
33/// There're two ways to do this:
34///
35/// - For RPC calls with clear caller/callee relationship, the tracing context can be serialized
36/// into the W3C trace context format and passed in HTTP headers seamlessly with a service
37/// middleware. For example, the DDL requests from the frontend to the meta service.
38/// - For RPC calls with no clear caller/callee relationship or with asynchronous semantics, the
39/// tracing context should be passed manually as a protobuf field in the request and response for
40/// better lifetime management. For example, the exchange services between streaming actors or
41/// batch stages, and the `create_task` requests from the frontend to the batch service.
42///
43/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
44#[derive(Debug, Clone)]
45pub struct TracingContext(opentelemetry::Context);
46
47type Propagator = TraceContextPropagator;
48
49impl TracingContext {
50 /// Create a new tracing context from a tracing span.
51 pub fn from_span(span: &tracing::Span) -> Self {
52 Self(span.context())
53 }
54
55 /// Create a new tracing context from the current tracing span considered by the subscriber.
56 pub fn from_current_span() -> Self {
57 Self::from_span(&tracing::Span::current())
58 }
59
60 /// Create a no-op tracing context.
61 pub fn none() -> Self {
62 Self(opentelemetry::Context::new())
63 }
64
65 /// Attach the given span as a child of the context. Returns the attached span.
66 pub fn attach(&self, span: tracing::Span) -> tracing::Span {
67 span.set_parent(self.0.clone());
68 span
69 }
70
71 /// Convert the tracing context to the W3C trace context format.
72 fn to_w3c(&self) -> HashMap<String, String> {
73 let mut fields = HashMap::new();
74 Propagator::new().inject_context(&self.0, &mut fields);
75 fields
76 }
77
78 /// Create a new tracing context from the W3C trace context format.
79 fn from_w3c(fields: &HashMap<String, String>) -> Self {
80 let context = Propagator::new().extract(fields);
81 Self(context)
82 }
83
84 /// Convert the tracing context to the protobuf format.
85 pub fn to_protobuf(&self) -> HashMap<String, String> {
86 self.to_w3c()
87 }
88
89 /// Create a new tracing context from the protobuf format.
90 pub fn from_protobuf(fields: &HashMap<String, String>) -> Self {
91 Self::from_w3c(fields)
92 }
93
94 /// Convert the tracing context to the W3C trace context format in HTTP headers.
95 pub fn to_http_headers(&self) -> http::HeaderMap {
96 let map = self.to_w3c();
97 http::HeaderMap::try_from(&map).unwrap_or_default()
98 }
99
100 /// Create a new tracing context from the W3C trace context format in HTTP headers.
101 ///
102 /// Returns `None` if the headers are invalid.
103 pub fn from_http_headers(headers: &http::HeaderMap) -> Option<Self> {
104 let mut map = HashMap::new();
105
106 // See [Trace Context](https://www.w3.org/TR/trace-context/) for these header names.
107 for key in ["traceparent", "tracestate"] {
108 let value = headers.get(key)?.to_str().ok()?;
109 map.insert(key.to_owned(), value.to_owned());
110 }
111
112 Some(Self::from_w3c(&map))
113 }
114}
115
116/// Extension trait allowing [`futures::Stream`]s to be instrumented with a [`tracing::Span`].
117#[easy_ext::ext(InstrumentStream)]
118impl<T> T
119where
120 T: futures::Stream + Sized,
121{
122 /// Instruments the stream with the given span. Alias for
123 /// [`tracing_futures::Instrument::instrument`].
124 ///
125 /// The span will be entered and exited every time the stream is polled. The span will be
126 /// closed only when the stream is dropped.
127 ///
128 /// If the stream is long-lived, consider [`InstrumentStream::instrument_with`] instead to
129 /// avoid accumulating too many events in the span.
130 pub fn instrument(self, span: tracing::Span) -> tracing_futures::Instrumented<Self> {
131 tracing_futures::Instrument::instrument(self, span)
132 }
133
134 /// Instruments the stream with spans created by the given closure **every time an item is
135 /// yielded**.
136 pub fn instrument_with<S>(self, make_span: S) -> WithInstrumented<Self, S>
137 where
138 S: FnMut() -> tracing::Span,
139 {
140 WithInstrumented::new(self, make_span)
141 }
142}
143
144pin_project_lite::pin_project! {
145 /// A [`futures::Stream`] that has been instrumented with
146 /// [`InstrumentStream::instrument_with`].
147 #[derive(Debug, Clone)]
148 pub struct WithInstrumented<T, S> {
149 #[pin]
150 inner: T,
151 make_span: S,
152 current_span: Option<tracing::Span>,
153 }
154}
155
156impl<T, S> WithInstrumented<T, S> {
157 /// Creates a new [`WithInstrumented`] stream.
158 fn new(inner: T, make_span: S) -> Self {
159 Self {
160 inner,
161 make_span,
162 current_span: None,
163 }
164 }
165
166 /// Returns the inner stream.
167 pub fn into_inner(self) -> T {
168 self.inner
169 }
170}
171
172impl<T, S> futures::Stream for WithInstrumented<T, S>
173where
174 T: futures::Stream + Sized,
175 S: FnMut() -> tracing::Span,
176{
177 type Item = T::Item;
178
179 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 let this = self.project();
181
182 let poll = {
183 let _entered = this.current_span.get_or_insert_with(this.make_span).enter();
184 this.inner.poll_next(cx)
185 };
186
187 if poll.is_ready() {
188 // Drop the span when a new item is yielded.
189 *this.current_span = None;
190 }
191
192 poll
193 }
194}