risingwave_stream/executor/wrapper/
trace.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use await_tree::InstrumentAwait;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use tracing::{Instrument, Span};
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream};
24
25/// Streams wrapped by `trace` will be traced with `tracing` spans and reported to `opentelemetry`.
26#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn trace(info: Arc<ExecutorInfo>, actor_ctx: ActorContextRef, input: impl MessageStream) {
28    let enable_executor_row_count = (actor_ctx.config.developer).enable_executor_row_count;
29    let actor_id_str = actor_ctx.id.to_string();
30    let fragment_id_str = actor_ctx.fragment_id.to_string();
31
32    let executor_row_count = if enable_executor_row_count {
33        let count = actor_ctx
34            .streaming_metrics
35            .executor_row_count
36            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]);
37        Some(count)
38    } else {
39        None
40    };
41
42    let new_span = || {
43        tracing::info_span!(
44            "executor",
45            "otel.name" = info.identity,
46            "message" = tracing::field::Empty,    // record later
47            "chunk_size" = tracing::field::Empty, // record later
48        )
49    };
50    let mut span = new_span();
51
52    pin_mut!(input);
53
54    while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
55        // Emit a debug event and record the message type.
56        match &message {
57            Message::Chunk(chunk) => {
58                if let Some(count) = &executor_row_count {
59                    count.inc_by(chunk.cardinality() as u64);
60                }
61                tracing::debug!(
62                    target: "events::stream::message::chunk",
63                    parent: &span,
64                    cardinality = chunk.cardinality(),
65                    capacity = chunk.capacity(),
66                    "\n{}\n", chunk.to_pretty_with_schema(&info.schema),
67                );
68                span.record("message", "chunk");
69                span.record("chunk_size", chunk.cardinality());
70            }
71            Message::Watermark(watermark) => {
72                tracing::debug!(
73                    target: "events::stream::message::watermark",
74                    parent: &span,
75                    value = ?watermark.val,
76                    col_idx = watermark.col_idx,
77                );
78                span.record("message", "watermark");
79            }
80            Message::Barrier(barrier) => {
81                tracing::debug!(
82                    target: "events::stream::message::barrier",
83                    parent: &span,
84                    prev_epoch = barrier.epoch.prev,
85                    curr_epoch = barrier.epoch.curr,
86                    kind = ?barrier.kind,
87                    mutation = ?barrier.mutation,
88                );
89                span.record("message", "barrier");
90            }
91        };
92
93        // Drop the span as the inner executor has yielded a new message.
94        //
95        // This is essentially similar to `.instrument(new_span())`, but it allows us to
96        // emit the debug event and record the message type.
97        let _ = std::mem::replace(&mut span, Span::none());
98
99        yield message;
100
101        // Create a new span after we're called again. The parent span may also have been
102        // updated.
103        span = new_span();
104    }
105}
106
107/// Streams wrapped by `instrument_await_tree` will be able to print the spans of the
108/// executors in the stack trace through `await-tree`.
109#[try_stream(ok = Message, error = StreamExecutorError)]
110pub async fn instrument_await_tree(info: Arc<ExecutorInfo>, input: impl MessageStream) {
111    pin_mut!(input);
112
113    let span: await_tree::Span = info.identity.clone().into();
114
115    while let Some(message) = input
116        .next()
117        .instrument_await(span.clone())
118        .await
119        .transpose()?
120    {
121        yield message;
122    }
123}