risingwave_stream/executor/wrapper/
trace.rs1use std::sync::Arc;
16
17use await_tree::InstrumentAwait;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use tracing::{Instrument, Span};
21
22use crate::executor::error::StreamExecutorError;
23use crate::executor::{ActorContextRef, ExecutorInfo, Message, MessageStream};
24
25#[try_stream(ok = Message, error = StreamExecutorError)]
27pub async fn trace(info: Arc<ExecutorInfo>, actor_ctx: ActorContextRef, input: impl MessageStream) {
28 let enable_executor_row_count = (actor_ctx.config.developer).enable_executor_row_count;
29 let actor_id_str = actor_ctx.id.to_string();
30 let fragment_id_str = actor_ctx.fragment_id.to_string();
31
32 let executor_row_count = if enable_executor_row_count {
33 let count = actor_ctx
34 .streaming_metrics
35 .executor_row_count
36 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, &info.identity]);
37 Some(count)
38 } else {
39 None
40 };
41
42 let new_span = || {
43 tracing::info_span!(
44 "executor",
45 "otel.name" = info.identity,
46 "message" = tracing::field::Empty, "chunk_size" = tracing::field::Empty, )
49 };
50 let mut span = new_span();
51
52 pin_mut!(input);
53
54 while let Some(message) = input.next().instrument(span.clone()).await.transpose()? {
55 match &message {
57 Message::Chunk(chunk) => {
58 if let Some(count) = &executor_row_count {
59 count.inc_by(chunk.cardinality() as u64);
60 }
61 tracing::debug!(
62 target: "events::stream::message::chunk",
63 parent: &span,
64 cardinality = chunk.cardinality(),
65 capacity = chunk.capacity(),
66 "\n{}\n", chunk.to_pretty_with_schema(&info.schema),
67 );
68 span.record("message", "chunk");
69 span.record("chunk_size", chunk.cardinality());
70 }
71 Message::Watermark(watermark) => {
72 tracing::debug!(
73 target: "events::stream::message::watermark",
74 parent: &span,
75 value = ?watermark.val,
76 col_idx = watermark.col_idx,
77 );
78 span.record("message", "watermark");
79 }
80 Message::Barrier(barrier) => {
81 tracing::debug!(
82 target: "events::stream::message::barrier",
83 parent: &span,
84 prev_epoch = barrier.epoch.prev,
85 curr_epoch = barrier.epoch.curr,
86 kind = ?barrier.kind,
87 mutation = ?barrier.mutation,
88 );
89 span.record("message", "barrier");
90 }
91 };
92
93 let _ = std::mem::replace(&mut span, Span::none());
98
99 yield message;
100
101 span = new_span();
104 }
105}
106
107#[try_stream(ok = Message, error = StreamExecutorError)]
110pub async fn instrument_await_tree(info: Arc<ExecutorInfo>, input: impl MessageStream) {
111 pin_mut!(input);
112
113 let span: await_tree::Span = info.identity.clone().into();
114
115 while let Some(message) = input
116 .next()
117 .instrument_await(span.clone())
118 .await
119 .transpose()?
120 {
121 yield message;
122 }
123}