risingwave_frontend/optimizer/plan_node/
logical_get_channel_delta_stats.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::bail_not_implemented;
17use risingwave_common::catalog::Schema;
18
19use super::generic::GenericPlanRef;
20use super::utils::{Distill, childless_record};
21use super::{
22 ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalPlanRef as PlanRef, PlanBase,
23 PredicatePushdown, ToBatch, ToStream,
24};
25use crate::error::Result;
26use crate::optimizer::plan_node::{
27 ColumnPruningContext, LogicalProject, PredicatePushdownContext, RewriteStreamContext,
28 ToStreamContext,
29};
30use crate::optimizer::property::FunctionalDependencySet;
31use crate::utils::{ColIndexMapping, Condition};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalGetChannelDeltaStats {
37 pub base: PlanBase<Logical>,
38 pub at_time: Option<u64>,
39 pub time_offset: Option<u64>,
40}
41
42impl LogicalGetChannelDeltaStats {
43 pub fn new(
45 ctx: crate::OptimizerContextRef,
46 schema: Schema,
47 at_time: Option<u64>,
48 time_offset: Option<u64>,
49 ) -> Self {
50 let functional_dependency = FunctionalDependencySet::new(schema.len());
51 let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
52 Self {
53 base,
54 at_time,
55 time_offset,
56 }
57 }
58
59 pub fn at_time(&self) -> Option<u64> {
61 self.at_time
62 }
63
64 pub fn time_offset(&self) -> Option<u64> {
66 self.time_offset
67 }
68}
69
70impl_plan_tree_node_for_leaf! { Logical, LogicalGetChannelDeltaStats }
71
72impl Distill for LogicalGetChannelDeltaStats {
73 fn distill<'a>(&self) -> XmlNode<'a> {
74 let fields = vec![
75 ("at_time", Pretty::debug(&self.at_time)),
76 ("time_offset", Pretty::debug(&self.time_offset)),
77 ];
78 childless_record("LogicalGetChannelDeltaStats", fields)
79 }
80}
81
82impl ExprRewritable<Logical> for LogicalGetChannelDeltaStats {
83 fn has_rewritable_expr(&self) -> bool {
84 false
85 }
86
87 fn rewrite_exprs(&self, _r: &mut dyn crate::expr::ExprRewriter) -> PlanRef {
88 self.clone().into()
89 }
90}
91
92impl crate::optimizer::plan_node::expr_visitable::ExprVisitable for LogicalGetChannelDeltaStats {
93 fn visit_exprs(&self, _v: &mut dyn crate::expr::ExprVisitor) {
94 }
96}
97
98impl ColPrunable for LogicalGetChannelDeltaStats {
99 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
100 LogicalProject::with_out_col_idx(self.clone().into(), required_cols.iter().cloned()).into()
101 }
102}
103
104impl PredicatePushdown for LogicalGetChannelDeltaStats {
105 fn predicate_pushdown(
106 &self,
107 predicate: Condition,
108 _ctx: &mut PredicatePushdownContext,
109 ) -> PlanRef {
110 LogicalFilter::create(self.clone().into(), predicate)
111 }
112}
113
114impl ToBatch for LogicalGetChannelDeltaStats {
115 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
116 use crate::optimizer::plan_node::BatchGetChannelDeltaStats;
117 Ok(BatchGetChannelDeltaStats::new(
118 self.base.ctx().clone(),
119 self.base.schema().clone(),
120 self.at_time,
121 self.time_offset,
122 )
123 .into())
124 }
125}
126
127impl ToStream for LogicalGetChannelDeltaStats {
128 fn to_stream(
129 &self,
130 _ctx: &mut ToStreamContext,
131 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
132 bail_not_implemented!("Streaming not implemented for LogicalGetChannelDeltaStats")
133 }
134
135 fn logical_rewrite_for_stream(
136 &self,
137 _ctx: &mut RewriteStreamContext,
138 ) -> Result<(PlanRef, ColIndexMapping)> {
139 bail_not_implemented!("Streaming not implemented for LogicalGetChannelDeltaStats")
140 }
141}