risingwave_common/util/tracing.rs
1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod layer;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use opentelemetry::propagation::TextMapPropagator;
22use opentelemetry_sdk::propagation::TraceContextPropagator;
23use tracing_opentelemetry::OpenTelemetrySpanExt;
24
25/// Context for tracing used for propagating tracing information in a distributed system.
26///
27/// Generally, the caller of a service should create a tracing context from the current tracing span
28/// and pass it to the callee through the network. The callee will then attach its local tracing
29/// span as a child of the tracing context, so that the external tracing service can associate them
30/// in a single trace.
31///
32/// The tracing context must be serialized and deserialized when passing through the network.
33/// There're two ways to do this:
34///
35/// - For RPC calls with clear caller/callee relationship, the tracing context can be serialized
36/// into the W3C trace context format and passed in HTTP headers seamlessly with a service
37/// middleware. For example, the DDL requests from the frontend to the meta service.
38/// - For RPC calls with no clear caller/callee relationship or with asynchronous semantics, the
39/// tracing context should be passed manually as a protobuf field in the request and response for
40/// better lifetime management. For example, the exchange services between streaming actors or
41/// batch stages, and the `create_task` requests from the frontend to the batch service.
42///
43/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
44#[derive(Debug, Clone)]
45pub struct TracingContext(opentelemetry::Context);
46
47type Propagator = TraceContextPropagator;
48
49impl TracingContext {
50 /// Create a new tracing context from a tracing span.
51 pub fn from_span(span: &tracing::Span) -> Self {
52 Self(span.context())
53 }
54
55 /// Create a new tracing context from the current tracing span considered by the subscriber.
56 pub fn from_current_span() -> Self {
57 Self::from_span(&tracing::Span::current())
58 }
59
60 /// Create a no-op tracing context.
61 pub fn none() -> Self {
62 Self(opentelemetry::Context::new())
63 }
64
65 /// Attach the given span as a child of the context. Returns the attached span.
66 pub fn attach(&self, span: tracing::Span) -> tracing::Span {
67 // `set_parent` may fail if the subscriber doesn't support OpenTelemetry.
68 let _ = span.set_parent(self.0.clone());
69 span
70 }
71
72 /// Convert the tracing context to the W3C trace context format.
73 fn to_w3c(&self) -> HashMap<String, String> {
74 let mut fields = HashMap::new();
75 Propagator::new().inject_context(&self.0, &mut fields);
76 fields
77 }
78
79 /// Create a new tracing context from the W3C trace context format.
80 fn from_w3c(fields: &HashMap<String, String>) -> Self {
81 let context = Propagator::new().extract(fields);
82 Self(context)
83 }
84
85 /// Convert the tracing context to the protobuf format.
86 pub fn to_protobuf(&self) -> HashMap<String, String> {
87 self.to_w3c()
88 }
89
90 /// Create a new tracing context from the protobuf format.
91 pub fn from_protobuf(fields: &HashMap<String, String>) -> Self {
92 Self::from_w3c(fields)
93 }
94
95 /// Convert the tracing context to the W3C trace context format in HTTP headers.
96 pub fn to_http_headers(&self) -> http::HeaderMap {
97 let map = self.to_w3c();
98 http::HeaderMap::try_from(&map).unwrap_or_default()
99 }
100
101 /// Create a new tracing context from the W3C trace context format in HTTP headers.
102 ///
103 /// Returns `None` if the headers are invalid.
104 pub fn from_http_headers(headers: &http::HeaderMap) -> Option<Self> {
105 let mut map = HashMap::new();
106
107 // See [Trace Context](https://www.w3.org/TR/trace-context/) for these header names.
108 for key in ["traceparent", "tracestate"] {
109 let value = headers.get(key)?.to_str().ok()?;
110 map.insert(key.to_owned(), value.to_owned());
111 }
112
113 Some(Self::from_w3c(&map))
114 }
115}
116
117/// Extension trait allowing [`futures::Stream`]s to be instrumented with a [`tracing::Span`].
118#[easy_ext::ext(InstrumentStream)]
119impl<T> T
120where
121 T: futures::Stream + Sized,
122{
123 /// Instruments the stream with the given span. Alias for
124 /// [`tracing_futures::Instrument::instrument`].
125 ///
126 /// The span will be entered and exited every time the stream is polled. The span will be
127 /// closed only when the stream is dropped.
128 ///
129 /// If the stream is long-lived, consider [`InstrumentStream::instrument_with`] instead to
130 /// avoid accumulating too many events in the span.
131 pub fn instrument(self, span: tracing::Span) -> tracing_futures::Instrumented<Self> {
132 tracing_futures::Instrument::instrument(self, span)
133 }
134
135 /// Instruments the stream with spans created by the given closure **every time an item is
136 /// yielded**.
137 pub fn instrument_with<S>(self, make_span: S) -> WithInstrumented<Self, S>
138 where
139 S: FnMut() -> tracing::Span,
140 {
141 WithInstrumented::new(self, make_span)
142 }
143}
144
145pin_project_lite::pin_project! {
146 /// A [`futures::Stream`] that has been instrumented with
147 /// [`InstrumentStream::instrument_with`].
148 #[derive(Debug, Clone)]
149 pub struct WithInstrumented<T, S> {
150 #[pin]
151 inner: T,
152 make_span: S,
153 current_span: Option<tracing::Span>,
154 }
155}
156
157impl<T, S> WithInstrumented<T, S> {
158 /// Creates a new [`WithInstrumented`] stream.
159 fn new(inner: T, make_span: S) -> Self {
160 Self {
161 inner,
162 make_span,
163 current_span: None,
164 }
165 }
166
167 /// Returns the inner stream.
168 pub fn into_inner(self) -> T {
169 self.inner
170 }
171}
172
173impl<T, S> futures::Stream for WithInstrumented<T, S>
174where
175 T: futures::Stream + Sized,
176 S: FnMut() -> tracing::Span,
177{
178 type Item = T::Item;
179
180 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181 let this = self.project();
182
183 let poll = {
184 let _entered = this.current_span.get_or_insert_with(this.make_span).enter();
185 this.inner.poll_next(cx)
186 };
187
188 if poll.is_ready() {
189 // Drop the span when a new item is yielded.
190 *this.current_span = None;
191 }
192
193 poll
194 }
195}