risingwave_stream/executor/wrapper/
stream_node_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use tokio::time::Instant;
17
18use crate::executor::monitor::profiling_stats::{ProfileMetricsExt, ProfileMetricsImpl};
19use crate::executor::prelude::*;
20
21#[try_stream(ok = Message, error = StreamExecutorError)]
22pub async fn stream_node_metrics(
23    info: Arc<ExecutorInfo>,
24    enable_explain_analyze_stats: bool,
25    input: impl MessageStream,
26    actor_ctx: ActorContextRef,
27) {
28    let stats = ProfileMetricsImpl::new(
29        info.id,
30        &actor_ctx.streaming_metrics,
31        enable_explain_analyze_stats,
32    );
33
34    #[for_await]
35    for message in input {
36        let message = message?;
37        if let Message::Chunk(ref c) = message {
38            stats.inc_row_count(c.cardinality() as u64);
39        }
40        let blocking_duration = Instant::now();
41        yield message;
42        stats.inc_blocking_duration_ns(blocking_duration.elapsed().as_nanos() as u64);
43    }
44}