risingwave_stream/executor/wrapper/
stream_node_metrics.rs1use futures_async_stream::try_stream;
16use tokio::time::Instant;
17
18use crate::executor::monitor::profiling_stats::{ProfileMetricsExt, ProfileMetricsImpl};
19use crate::executor::prelude::*;
20
21#[try_stream(ok = Message, error = StreamExecutorError)]
22pub async fn stream_node_metrics(
23 info: Arc<ExecutorInfo>,
24 enable_explain_analyze_stats: bool,
25 input: impl MessageStream,
26 actor_ctx: ActorContextRef,
27) {
28 let stats = ProfileMetricsImpl::new(
29 info.id,
30 &actor_ctx.streaming_metrics,
31 enable_explain_analyze_stats,
32 );
33
34 #[for_await]
35 for message in input {
36 let message = message?;
37 if let Message::Chunk(ref c) = message {
38 stats.inc_row_count(c.cardinality() as u64);
39 }
40 let blocking_duration = Instant::now();
41 yield message;
42 stats.inc_blocking_duration_ns(blocking_duration.elapsed().as_nanos() as u64);
43 }
44}