risingwave_common/util/tracing.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod layer;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
/// Context for tracing used for propagating tracing information in a distributed system.
///
/// Generally, the caller of a service should create a tracing context from the current tracing span
/// and pass it to the callee through the network. The callee will then attach its local tracing
/// span as a child of the tracing context, so that the external tracing service can associate them
/// in a single trace.
///
/// The tracing context must be serialized and deserialized when passing through the network.
/// There're two ways to do this:
///
/// - For RPC calls with clear caller/callee relationship, the tracing context can be serialized
/// into the W3C trace context format and passed in HTTP headers seamlessly with a service
/// middleware. For example, the DDL requests from the frontend to the meta service.
/// - For RPC calls with no clear caller/callee relationship or with asynchronous semantics, the
/// tracing context should be passed manually as a protobuf field in the request and response for
/// better lifetime management. For example, the exchange services between streaming actors or
/// batch stages, and the `create_task` requests from the frontend to the batch service.
///
/// See [Trace Context](https://www.w3.org/TR/trace-context/) for more information.
#[derive(Debug, Clone)]
pub struct TracingContext(opentelemetry::Context);
type Propagator = TraceContextPropagator;
impl TracingContext {
/// Create a new tracing context from a tracing span.
pub fn from_span(span: &tracing::Span) -> Self {
Self(span.context())
}
/// Create a new tracing context from the current tracing span considered by the subscriber.
pub fn from_current_span() -> Self {
Self::from_span(&tracing::Span::current())
}
/// Create a no-op tracing context.
pub fn none() -> Self {
Self(opentelemetry::Context::new())
}
/// Attach the given span as a child of the context. Returns the attached span.
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
span.set_parent(self.0.clone());
span
}
/// Convert the tracing context to the W3C trace context format.
fn to_w3c(&self) -> HashMap<String, String> {
let mut fields = HashMap::new();
Propagator::new().inject_context(&self.0, &mut fields);
fields
}
/// Create a new tracing context from the W3C trace context format.
fn from_w3c(fields: &HashMap<String, String>) -> Self {
let context = Propagator::new().extract(fields);
Self(context)
}
/// Convert the tracing context to the protobuf format.
pub fn to_protobuf(&self) -> HashMap<String, String> {
self.to_w3c()
}
/// Create a new tracing context from the protobuf format.
pub fn from_protobuf(fields: &HashMap<String, String>) -> Self {
Self::from_w3c(fields)
}
/// Convert the tracing context to the W3C trace context format in HTTP headers.
pub fn to_http_headers(&self) -> http::HeaderMap {
let map = self.to_w3c();
http::HeaderMap::try_from(&map).unwrap_or_default()
}
/// Create a new tracing context from the W3C trace context format in HTTP headers.
///
/// Returns `None` if the headers are invalid.
pub fn from_http_headers(headers: &http::HeaderMap) -> Option<Self> {
let mut map = HashMap::new();
// See [Trace Context](https://www.w3.org/TR/trace-context/) for these header names.
for key in ["traceparent", "tracestate"] {
let value = headers.get(key)?.to_str().ok()?;
map.insert(key.to_string(), value.to_string());
}
Some(Self::from_w3c(&map))
}
}
/// Extension trait allowing [`futures::Stream`]s to be instrumented with a [`tracing::Span`].
#[easy_ext::ext(InstrumentStream)]
impl<T> T
where
T: futures::Stream + Sized,
{
/// Instruments the stream with the given span. Alias for
/// [`tracing_futures::Instrument::instrument`].
///
/// The span will be entered and exited every time the stream is polled. The span will be
/// closed only when the stream is dropped.
///
/// If the stream is long-lived, consider [`InstrumentStream::instrument_with`] instead to
/// avoid accumulating too many events in the span.
pub fn instrument(self, span: tracing::Span) -> tracing_futures::Instrumented<Self> {
tracing_futures::Instrument::instrument(self, span)
}
/// Instruments the stream with spans created by the given closure **every time an item is
/// yielded**.
pub fn instrument_with<S>(self, make_span: S) -> WithInstrumented<Self, S>
where
S: FnMut() -> tracing::Span,
{
WithInstrumented::new(self, make_span)
}
}
pin_project_lite::pin_project! {
/// A [`futures::Stream`] that has been instrumented with
/// [`InstrumentStream::instrument_with`].
#[derive(Debug, Clone)]
pub struct WithInstrumented<T, S> {
#[pin]
inner: T,
make_span: S,
current_span: Option<tracing::Span>,
}
}
impl<T, S> WithInstrumented<T, S> {
/// Creates a new [`WithInstrumented`] stream.
fn new(inner: T, make_span: S) -> Self {
Self {
inner,
make_span,
current_span: None,
}
}
/// Returns the inner stream.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T, S> futures::Stream for WithInstrumented<T, S>
where
T: futures::Stream + Sized,
S: FnMut() -> tracing::Span,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let poll = {
let _entered = this.current_span.get_or_insert_with(this.make_span).enter();
this.inner.poll_next(cx)
};
if poll.is_ready() {
// Drop the span when a new item is yielded.
*this.current_span = None;
}
poll
}
}