risingwave_frontend/optimizer/plan_node/
logical_get_channel_delta_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::bail_not_implemented;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22    ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalPlanRef as PlanRef, PlanBase,
23    PredicatePushdown, ToBatch, ToStream,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::{
27    ColumnPruningContext, LogicalProject, PredicatePushdownContext, RewriteStreamContext,
28    ToStreamContext,
29};
30use crate::optimizer::property::FunctionalDependencySet;
31use crate::utils::{ColIndexMapping, Condition};
32
33/// `LogicalGetChannelDeltaStats` represents a plan node that retrieves channel statistics
34/// from the dashboard API. It has no inputs and returns channel stats data.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalGetChannelDeltaStats {
37    pub base: PlanBase<Logical>,
38    pub at_time: Option<u64>,
39    pub time_offset: Option<u64>,
40}
41
42impl LogicalGetChannelDeltaStats {
43    /// Create a new `LogicalGetChannelDeltaStats` node
44    pub fn new(
45        ctx: crate::OptimizerContextRef,
46        schema: Schema,
47        at_time: Option<u64>,
48        time_offset: Option<u64>,
49    ) -> Self {
50        let functional_dependency = FunctionalDependencySet::new(schema.len());
51        let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
52        Self {
53            base,
54            at_time,
55            time_offset,
56        }
57    }
58
59    /// Get the `at_time` parameter
60    pub fn at_time(&self) -> Option<u64> {
61        self.at_time
62    }
63
64    /// Get the `time_offset` parameter
65    pub fn time_offset(&self) -> Option<u64> {
66        self.time_offset
67    }
68}
69
70impl_plan_tree_node_for_leaf! { Logical, LogicalGetChannelDeltaStats }
71
72impl Distill for LogicalGetChannelDeltaStats {
73    fn distill<'a>(&self) -> XmlNode<'a> {
74        let fields = vec![
75            ("at_time", Pretty::debug(&self.at_time)),
76            ("time_offset", Pretty::debug(&self.time_offset)),
77        ];
78        childless_record("LogicalGetChannelDeltaStats", fields)
79    }
80}
81
82impl ExprRewritable<Logical> for LogicalGetChannelDeltaStats {
83    fn has_rewritable_expr(&self) -> bool {
84        false
85    }
86
87    fn rewrite_exprs(&self, _r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
88        self.clone().into()
89    }
90}
91
92impl crate::optimizer::plan_node::expr_visitable::ExprVisitable for LogicalGetChannelDeltaStats {
93    fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
94        // No expressions to visit
95    }
96}
97
98impl ColPrunable for LogicalGetChannelDeltaStats {
99    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
100        LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
101    }
102}
103
104impl PredicatePushdown for LogicalGetChannelDeltaStats {
105    fn predicate_pushdown(
106        &self,
107        predicate: Condition,
108        _ctx: &mut PredicatePushdownContext,
109    ) -> PlanRef {
110        LogicalFilter::create(self.clone().into(), predicate)
111    }
112}
113
114impl ToBatch for LogicalGetChannelDeltaStats {
115    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
116        use crate::optimizer::plan_node::BatchGetChannelDeltaStats;
117        Ok(BatchGetChannelDeltaStats::new(
118            self.base.ctx().clone(),
119            self.base.schema().clone(),
120            self.at_time,
121            self.time_offset,
122        )
123        .into())
124    }
125}
126
127impl ToStream for LogicalGetChannelDeltaStats {
128    fn to_stream(
129        &self,
130        _ctx: &mut ToStreamContext,
131    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
132        bail_not_implemented!("Streaming not implemented for LogicalGetChannelDeltaStats")
133    }
134
135    fn logical_rewrite_for_stream(
136        &self,
137        _ctx: &mut RewriteStreamContext,
138    ) -> Result<(PlanRef, ColIndexMapping)> {
139        bail_not_implemented!("Streaming not implemented for LogicalGetChannelDeltaStats")
140    }
141}