risingwave_batch_executors/executor/
get_channel_delta_stats.rs1use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use risingwave_common::array::DataChunk;
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::metrics_reader::MetricsReader;
21use risingwave_common::types::{DataType, F64, ScalarImpl};
22use risingwave_common::{ensure, try_match_expand};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30pub struct GetChannelDeltaStatsExecutor {
33 schema: Schema,
34 identity: String,
35 at_time: Option<u64>,
36 time_offset: Option<u64>,
37 metrics_reader: Arc<dyn MetricsReader>,
38}
39
40impl GetChannelDeltaStatsExecutor {
41 pub fn new(
42 schema: Schema,
43 identity: String,
44 at_time: Option<u64>,
45 time_offset: Option<u64>,
46 metrics_reader: Arc<dyn MetricsReader>,
47 ) -> Self {
48 Self {
49 schema,
50 identity,
51 at_time,
52 time_offset,
53 metrics_reader,
54 }
55 }
56
57 async fn fetch_channel_stats_from_metrics_reader(
59 &self,
60 ) -> Result<Vec<Vec<Option<ScalarImpl>>>> {
61 let response = self
63 .metrics_reader
64 .get_channel_delta_stats(
65 self.at_time.map(|t| t as i64),
66 self.time_offset.map(|t| t as i64),
67 )
68 .await?;
69
70 let mut rows = Vec::new();
72 for (key, stats) in response {
73 let row = vec![
74 Some(ScalarImpl::Int32(key.upstream_fragment_id as i32)),
75 Some(ScalarImpl::Int32(key.downstream_fragment_id as i32)),
76 Some(ScalarImpl::Float64(F64::from(stats.backpressure_rate))),
77 Some(ScalarImpl::Float64(F64::from(stats.recv_throughput))),
78 Some(ScalarImpl::Float64(F64::from(stats.send_throughput))),
79 ];
80 rows.push(row);
81 }
82
83 Ok(rows)
84 }
85}
86
87impl Executor for GetChannelDeltaStatsExecutor {
88 fn schema(&self) -> &Schema {
89 &self.schema
90 }
91
92 fn identity(&self) -> &str {
93 &self.identity
94 }
95
96 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
97 self.do_execute()
98 }
99}
100
101impl GetChannelDeltaStatsExecutor {
102 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
103 async fn do_execute(self: Box<Self>) {
104 let rows = self.fetch_channel_stats_from_metrics_reader().await?;
108
109 if !rows.is_empty() {
110 let mut array_builders = self.schema.create_array_builders(rows.len());
111
112 for (col_idx, builder) in array_builders.iter_mut().enumerate() {
114 for row in &rows {
115 let value = &row[col_idx];
116 builder.append(value);
117 }
118 }
119
120 let columns: Vec<_> = array_builders
121 .into_iter()
122 .map(|b| b.finish().into())
123 .collect();
124
125 let chunk = DataChunk::new(columns, rows.len());
126 yield chunk;
127 }
128 }
129}
130
131impl BoxedExecutorBuilder for GetChannelDeltaStatsExecutor {
132 async fn new_boxed_executor(
133 source: &ExecutorBuilder<'_>,
134 inputs: Vec<BoxedExecutor>,
135 ) -> Result<BoxedExecutor> {
136 ensure!(
137 inputs.is_empty(),
138 "GetChannelDeltaStatsExecutor should have no child!"
139 );
140
141 let get_channel_delta_stats_node = try_match_expand!(
142 source.plan_node().get_node_body().unwrap(),
143 NodeBody::GetChannelDeltaStats
144 )?;
145
146 let fields = vec![
149 Field::new("upstream_fragment_id", DataType::Int32),
150 Field::new("downstream_fragment_id", DataType::Int32),
151 Field::new("backpressure_rate", DataType::Float64),
152 Field::new("recv_throughput", DataType::Float64),
153 Field::new("send_throughput", DataType::Float64),
154 ];
155
156 let schema = Schema { fields };
157
158 let metrics_reader = source.context().metrics_reader();
160
161 Ok(Box::new(Self::new(
162 schema,
163 source.plan_node().get_identity().clone(),
164 get_channel_delta_stats_node.at_time,
165 get_channel_delta_stats_node.time_offset,
166 metrics_reader,
167 )))
168 }
169}