risingwave_stream/executor/wrapper/
trace.rs1use std::sync::Arc;
16
17use await_tree::InstrumentAwait;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use tracing::{Instrument, Span};
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream};
24
25#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn trace(
28 enable_executor_row_count: bool,
29 info: Arc<ExecutorInfo>,
30 actor_ctx: ActorContextRef,
31 input: impl MessageStream,
32) {
33 let actor_id_str = actor_ctx.id.to_string();
34 let fragment_id_str = actor_ctx.fragment_id.to_string();
35
36 let executor_row_count = if enable_executor_row_count {
37 let count = actor_ctx
38 .streaming_metrics
39 .executor_row_count
40 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]);
41 Some(count)
42 } else {
43 None
44 };
45
46 let new_span = || {
47 tracing::info_span!(
48 "executor",
49 "otel.name" = info.identity,
50 "message" = tracing::field::Empty, "chunk_size" = tracing::field::Empty, )
53 };
54 let mut span = new_span();
55
56 pin_mut!(input);
57
58 while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
59 match &message {
61 Message::Chunk(chunk) => {
62 if let Some(count) = &executor_row_count {
63 count.inc_by(chunk.cardinality() as u64);
64 }
65 tracing::debug!(
66 target: "events::stream::message::chunk",
67 parent: &span,
68 cardinality = chunk.cardinality(),
69 capacity = chunk.capacity(),
70 "\n{}\n", chunk.to_pretty_with_schema(&info.schema),
71 );
72 span.record("message", "chunk");
73 span.record("chunk_size", chunk.cardinality());
74 }
75 Message::Watermark(watermark) => {
76 tracing::debug!(
77 target: "events::stream::message::watermark",
78 parent: &span,
79 value = ?watermark.val,
80 col_idx = watermark.col_idx,
81 );
82 span.record("message", "watermark");
83 }
84 Message::Barrier(barrier) => {
85 tracing::debug!(
86 target: "events::stream::message::barrier",
87 parent: &span,
88 prev_epoch = barrier.epoch.prev,
89 curr_epoch = barrier.epoch.curr,
90 kind = ?barrier.kind,
91 );
92 span.record("message", "barrier");
93 }
94 };
95
96 let _ = std::mem::replace(&mut span, Span::none());
101
102 yield message;
103
104 span = new_span();
107 }
108}
109
110#[try_stream(ok = Message, error = StreamExecutorError)]
113pub async fn instrument_await_tree(info: Arc<ExecutorInfo>, input: impl MessageStream) {
114 pin_mut!(input);
115
116 let span: await_tree::Span = info.identity.clone().into();
117
118 while let Some(message) = input
119 .next()
120 .instrument_await(span.clone())
121 .await
122 .transpose()?
123 {
124 yield message;
125 }
126}