risingwave_stream/executor/wrapper/
trace.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use await_tree::InstrumentAwait;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use tracing::{Instrument, Span};
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream};
24
25/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`.
26#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn trace(
28    enable_executor_row_count: bool,
29    info: Arc<ExecutorInfo>,
30    actor_ctx: ActorContextRef,
31    input: impl MessageStream,
32) {
33    let actor_id_str = actor_ctx.id.to_string();
34    let fragment_id_str = actor_ctx.fragment_id.to_string();
35
36    let executor_row_count = if enable_executor_row_count {
37        let count = actor_ctx
38            .streaming_metrics
39            .executor_row_count
40            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]);
41        Some(count)
42    } else {
43        None
44    };
45
46    let new_span = || {
47        tracing::info_span!(
48            "executor",
49            "otel.name" = info.identity,
50            "message" = tracing::field::Empty,    // record later
51            "chunk_size" = tracing::field::Empty, // record later
52        )
53    };
54    let mut span = new_span();
55
56    pin_mut!(input);
57
58    while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
59        // Emit a debug event and record the message type.
60        match &message {
61            Message::Chunk(chunk) => {
62                if let Some(count) = &executor_row_count {
63                    count.inc_by(chunk.cardinality() as u64);
64                }
65                tracing::debug!(
66                    target: "events::stream::message::chunk",
67                    parent: &span,
68                    cardinality = chunk.cardinality(),
69                    capacity = chunk.capacity(),
70                    "\n{}\n", chunk.to_pretty_with_schema(&info.schema),
71                );
72                span.record("message", "chunk");
73                span.record("chunk_size", chunk.cardinality());
74            }
75            Message::Watermark(watermark) => {
76                tracing::debug!(
77                    target: "events::stream::message::watermark",
78                    parent: &span,
79                    value = ?watermark.val,
80                    col_idx = watermark.col_idx,
81                );
82                span.record("message", "watermark");
83            }
84            Message::Barrier(barrier) => {
85                tracing::debug!(
86                    target: "events::stream::message::barrier",
87                    parent: &span,
88                    prev_epoch = barrier.epoch.prev,
89                    curr_epoch = barrier.epoch.curr,
90                    kind = ?barrier.kind,
91                );
92                span.record("message", "barrier");
93            }
94        };
95
96        // Drop the span as the inner executor has yielded a new message.
97        //
98        // This is essentially similar to `.instrument(new_span())`, but it allows us to
99        // emit the debug event and record the message type.
100        let _ = std::mem::replace(&mut span, Span::none());
101
102        yield message;
103
104        // Create a new span after we're called again. The parent span may also have been
105        // updated.
106        span = new_span();
107    }
108}
109
110/// Streams wrapped by `instrument_await_tree` will be able to print the spans of the
111/// executors in the stack trace through `await-tree`.
112#[try_stream(ok = Message, error = StreamExecutorError)]
113pub async fn instrument_await_tree(info: Arc<ExecutorInfo>, input: impl MessageStream) {
114    pin_mut!(input);
115
116    let span: await_tree::Span = info.identity.clone().into();
117
118    while let Some(message) = input
119        .next()
120        .instrument_await(span.clone())
121        .await
122        .transpose()?
123    {
124        yield message;
125    }
126}