risingwave_frontend/optimizer/plan_node/
batch_get_channel_delta_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::Schema;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeLeaf, ToBatchPb,
23    ToDistributedBatch,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::property::{Distribution, Order};
28
29/// `BatchGetChannelDeltaStats` represents a batch plan node that retrieves channel statistics
30/// from the dashboard API. It has no inputs and returns channel stats data.
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct BatchGetChannelDeltaStats {
33    pub base: PlanBase<Batch>,
34    pub at_time: Option<u64>,
35    pub time_offset: Option<u64>,
36}
37
38impl PlanTreeNodeLeaf for BatchGetChannelDeltaStats {}
39impl_plan_tree_node_for_leaf! { Batch, BatchGetChannelDeltaStats }
40
41impl BatchGetChannelDeltaStats {
42    pub fn new(
43        ctx: crate::OptimizerContextRef,
44        schema: Schema,
45        at_time: Option<u64>,
46        time_offset: Option<u64>,
47    ) -> Self {
48        Self::with_dist(ctx, schema, at_time, time_offset, Distribution::Single)
49    }
50
51    pub fn with_dist(
52        ctx: crate::OptimizerContextRef,
53        schema: Schema,
54        at_time: Option<u64>,
55        time_offset: Option<u64>,
56        dist: Distribution,
57    ) -> Self {
58        let base = PlanBase::new_batch(ctx, schema, dist, Order::any());
59        Self {
60            base,
61            at_time,
62            time_offset,
63        }
64    }
65
66    /// Get the `at_time` parameter
67    pub fn at_time(&self) -> Option<u64> {
68        self.at_time
69    }
70
71    /// Get the `time_offset` parameter
72    pub fn time_offset(&self) -> Option<u64> {
73        self.time_offset
74    }
75}
76
77impl Distill for BatchGetChannelDeltaStats {
78    fn distill<'a>(&self) -> XmlNode<'a> {
79        let fields = vec![
80            ("at_time", Pretty::debug(&self.at_time)),
81            ("time_offset", Pretty::debug(&self.time_offset)),
82        ];
83        childless_record("BatchGetChannelDeltaStats", fields)
84    }
85}
86
87impl ToDistributedBatch for BatchGetChannelDeltaStats {
88    fn to_distributed(&self) -> Result<PlanRef> {
89        Ok(Self::with_dist(
90            self.base.ctx().clone(),
91            self.base.schema().clone(),
92            self.at_time,
93            self.time_offset,
94            Distribution::Single,
95        )
96        .into())
97    }
98}
99
100impl ToBatchPb for BatchGetChannelDeltaStats {
101    fn to_batch_prost_body(&self) -> NodeBody {
102        use risingwave_pb::batch_plan::GetChannelDeltaStatsNode;
103
104        NodeBody::GetChannelDeltaStats(GetChannelDeltaStatsNode {
105            at_time: self.at_time,
106            time_offset: self.time_offset,
107        })
108    }
109}
110
111impl ToLocalBatch for BatchGetChannelDeltaStats {
112    fn to_local(&self) -> Result<PlanRef> {
113        Ok(Self::with_dist(
114            self.base.ctx().clone(),
115            self.base.schema().clone(),
116            self.at_time,
117            self.time_offset,
118            Distribution::Single,
119        )
120        .into())
121    }
122}
123
124impl ExprRewritable<Batch> for BatchGetChannelDeltaStats {
125    fn has_rewritable_expr(&self) -> bool {
126        false
127    }
128
129    fn rewrite_exprs(&self, _r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
130        self.clone().into()
131    }
132}
133
134impl crate::optimizer::plan_node::expr_visitable::ExprVisitable for BatchGetChannelDeltaStats {
135    fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
136        // No expressions to visit
137    }
138}