risingwave_common/util/
tracing.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod layer;

use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};

use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Context for tracing used for propagating tracing information in a distributed system.
///
/// Generally, the caller of a service should create a tracing context from the current tracing span
/// and pass it to the callee through the network. The callee will then attach its local tracing
/// span as a child of the tracing context, so that the external tracing service can associate them
/// in a single trace.
///
/// The tracing context must be serialized and deserialized when passing through the network.
/// There're two ways to do this:
///
/// - For RPC calls with clear caller/callee relationship, the tracing context can be serialized
///   into the W3C trace context format and passed in HTTP headers seamlessly with a service
///   middleware. For example, the DDL requests from the frontend to the meta service.
/// - For RPC calls with no clear caller/callee relationship or with asynchronous semantics, the
///   tracing context should be passed manually as a protobuf field in the request and response for
///   better lifetime management. For example, the exchange services between streaming actors or
///   batch stages, and the `create_task` requests from the frontend to the batch service.
///
/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
#[derive(Debug, Clone)]
pub struct TracingContext(opentelemetry::Context);

type Propagator = TraceContextPropagator;

impl TracingContext {
    /// Create a new tracing context from a tracing span.
    pub fn from_span(span: &tracing::Span) -> Self {
        Self(span.context())
    }

    /// Create a new tracing context from the current tracing span considered by the subscriber.
    pub fn from_current_span() -> Self {
        Self::from_span(&tracing::Span::current())
    }

    /// Create a no-op tracing context.
    pub fn none() -> Self {
        Self(opentelemetry::Context::new())
    }

    /// Attach the given span as a child of the context. Returns the attached span.
    pub fn attach(&self, span: tracing::Span) -> tracing::Span {
        span.set_parent(self.0.clone());
        span
    }

    /// Convert the tracing context to the W3C trace context format.
    fn to_w3c(&self) -> HashMap<String, String> {
        let mut fields = HashMap::new();
        Propagator::new().inject_context(&self.0, &mut fields);
        fields
    }

    /// Create a new tracing context from the W3C trace context format.
    fn from_w3c(fields: &HashMap<String, String>) -> Self {
        let context = Propagator::new().extract(fields);
        Self(context)
    }

    /// Convert the tracing context to the protobuf format.
    pub fn to_protobuf(&self) -> HashMap<String, String> {
        self.to_w3c()
    }

    /// Create a new tracing context from the protobuf format.
    pub fn from_protobuf(fields: &HashMap<String, String>) -> Self {
        Self::from_w3c(fields)
    }

    /// Convert the tracing context to the W3C trace context format in HTTP headers.
    pub fn to_http_headers(&self) -> http::HeaderMap {
        let map = self.to_w3c();
        http::HeaderMap::try_from(&map).unwrap_or_default()
    }

    /// Create a new tracing context from the W3C trace context format in HTTP headers.
    ///
    /// Returns `None` if the headers are invalid.
    pub fn from_http_headers(headers: &http::HeaderMap) -> Option<Self> {
        let mut map = HashMap::new();

        // See [Trace Context](https://www.w3.org/TR/trace-context/) for these header names.
        for key in ["traceparent", "tracestate"] {
            let value = headers.get(key)?.to_str().ok()?;
            map.insert(key.to_string(), value.to_string());
        }

        Some(Self::from_w3c(&map))
    }
}

/// Extension trait allowing [`futures::Stream`]s to be instrumented with a [`tracing::Span`].
#[easy_ext::ext(InstrumentStream)]
impl<T> T
where
    T: futures::Stream + Sized,
{
    /// Instruments the stream with the given span. Alias for
    /// [`tracing_futures::Instrument::instrument`].
    ///
    /// The span will be entered and exited every time the stream is polled. The span will be
    /// closed only when the stream is dropped.
    ///
    /// If the stream is long-lived, consider [`InstrumentStream::instrument_with`] instead to
    /// avoid accumulating too many events in the span.
    pub fn instrument(self, span: tracing::Span) -> tracing_futures::Instrumented<Self> {
        tracing_futures::Instrument::instrument(self, span)
    }

    /// Instruments the stream with spans created by the given closure **every time an item is
    /// yielded**.
    pub fn instrument_with<S>(self, make_span: S) -> WithInstrumented<Self, S>
    where
        S: FnMut() -> tracing::Span,
    {
        WithInstrumented::new(self, make_span)
    }
}

pin_project_lite::pin_project! {
    /// A [`futures::Stream`] that has been instrumented with
    /// [`InstrumentStream::instrument_with`].
    #[derive(Debug, Clone)]
    pub struct WithInstrumented<T, S> {
        #[pin]
        inner: T,
        make_span: S,
        current_span: Option<tracing::Span>,
    }
}

impl<T, S> WithInstrumented<T, S> {
    /// Creates a new [`WithInstrumented`] stream.
    fn new(inner: T, make_span: S) -> Self {
        Self {
            inner,
            make_span,
            current_span: None,
        }
    }

    /// Returns the inner stream.
    pub fn into_inner(self) -> T {
        self.inner
    }
}

impl<T, S> futures::Stream for WithInstrumented<T, S>
where
    T: futures::Stream + Sized,
    S: FnMut() -> tracing::Span,
{
    type Item = T::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        let poll = {
            let _entered = this.current_span.get_or_insert_with(this.make_span).enter();
            this.inner.poll_next(cx)
        };

        if poll.is_ready() {
            // Drop the span when a new item is yielded.
            *this.current_span = None;
        }

        poll
    }
}