risingwave_batch_executors/executor/
get_channel_delta_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::metrics_reader::MetricsReader;
21use risingwave_common::types::{DataType, F64, ScalarImpl};
22use risingwave_common::{ensure, try_match_expand};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30/// [`GetChannelDeltaStatsExecutor`] implements the executor for retrieving channel statistics
31/// from the meta node via RPC calls. This executor has no inputs and returns channel stats data.
32pub struct GetChannelDeltaStatsExecutor {
33    schema: Schema,
34    identity: String,
35    at_time: Option<u64>,
36    time_offset: Option<u64>,
37    metrics_reader: Arc<dyn MetricsReader>,
38}
39
40impl GetChannelDeltaStatsExecutor {
41    pub fn new(
42        schema: Schema,
43        identity: String,
44        at_time: Option<u64>,
45        time_offset: Option<u64>,
46        metrics_reader: Arc<dyn MetricsReader>,
47    ) -> Self {
48        Self {
49            schema,
50            identity,
51            at_time,
52            time_offset,
53            metrics_reader,
54        }
55    }
56
57    /// Fetch channel stats from metrics reader
58    async fn fetch_channel_stats_from_metrics_reader(
59        &self,
60    ) -> Result<Vec<Vec<Option<ScalarImpl>>>> {
61        // Fetch channel delta stats from meta node
62        let response = self
63            .metrics_reader
64            .get_channel_delta_stats(
65                self.at_time.map(|t| t as i64),
66                self.time_offset.map(|t| t as i64),
67            )
68            .await?;
69
70        // Convert response to rows
71        let mut rows = Vec::new();
72        for (key, stats) in response {
73            let row = vec![
74                Some(ScalarImpl::Int32(key.upstream_fragment_id as i32)),
75                Some(ScalarImpl::Int32(key.downstream_fragment_id as i32)),
76                Some(ScalarImpl::Float64(F64::from(stats.backpressure_rate))),
77                Some(ScalarImpl::Float64(F64::from(stats.recv_throughput))),
78                Some(ScalarImpl::Float64(F64::from(stats.send_throughput))),
79            ];
80            rows.push(row);
81        }
82
83        Ok(rows)
84    }
85}
86
87impl Executor for GetChannelDeltaStatsExecutor {
88    fn schema(&self) -> &Schema {
89        &self.schema
90    }
91
92    fn identity(&self) -> &str {
93        &self.identity
94    }
95
96    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
97        self.do_execute()
98    }
99}
100
101impl GetChannelDeltaStatsExecutor {
102    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
103    async fn do_execute(self: Box<Self>) {
104        // 1. Read the channel stats from the meta node RPC.
105        // 2. Render into rows.
106
107        let rows = self.fetch_channel_stats_from_metrics_reader().await?;
108
109        if !rows.is_empty() {
110            let mut array_builders = self.schema.create_array_builders(rows.len());
111
112            // Build arrays for each column
113            for (col_idx, builder) in array_builders.iter_mut().enumerate() {
114                for row in &rows {
115                    let value = &row[col_idx];
116                    builder.append(value);
117                }
118            }
119
120            let columns: Vec<_> = array_builders
121                .into_iter()
122                .map(|b| b.finish().into())
123                .collect();
124
125            let chunk = DataChunk::new(columns, rows.len());
126            yield chunk;
127        }
128    }
129}
130
131impl BoxedExecutorBuilder for GetChannelDeltaStatsExecutor {
132    async fn new_boxed_executor(
133        source: &ExecutorBuilder<'_>,
134        inputs: Vec<BoxedExecutor>,
135    ) -> Result<BoxedExecutor> {
136        ensure!(
137            inputs.is_empty(),
138            "GetChannelDeltaStatsExecutor should have no child!"
139        );
140
141        let get_channel_delta_stats_node = try_match_expand!(
142            source.plan_node().get_node_body().unwrap(),
143            NodeBody::GetChannelDeltaStats
144        )?;
145
146        // Create a schema for channel stats
147        // This should match the expected schema from table_function.rs
148        let fields = vec![
149            Field::new("upstream_fragment_id", DataType::Int32),
150            Field::new("downstream_fragment_id", DataType::Int32),
151            Field::new("backpressure_rate", DataType::Float64),
152            Field::new("recv_throughput", DataType::Float64),
153            Field::new("send_throughput", DataType::Float64),
154        ];
155
156        let schema = Schema { fields };
157
158        // Get the MetricsReader from the batch task context
159        let metrics_reader = source.context().metrics_reader();
160
161        Ok(Box::new(Self::new(
162            schema,
163            source.plan_node().get_identity().clone(),
164            get_channel_delta_stats_node.at_time,
165            get_channel_delta_stats_node.time_offset,
166            metrics_reader,
167        )))
168    }
169}