risingwave_frontend/optimizer/plan_node/
batch_hop_window.rs1use risingwave_pb::batch_plan::HopWindowNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22 ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchHopWindow {
35 pub base: PlanBase<Batch>,
36 core: generic::HopWindow<PlanRef>,
37 window_start_exprs: Vec<ExprImpl>,
38 window_end_exprs: Vec<ExprImpl>,
39}
40
41impl BatchHopWindow {
42 pub fn new(
43 core: generic::HopWindow<PlanRef>,
44 window_start_exprs: Vec<ExprImpl>,
45 window_end_exprs: Vec<ExprImpl>,
46 ) -> Self {
47 let distribution = core
48 .i2o_col_mapping()
49 .rewrite_provided_distribution(core.input.distribution());
50 let base =
51 PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
52 BatchHopWindow {
53 base,
54 core,
55 window_start_exprs,
56 window_end_exprs,
57 }
58 }
59}
60impl_distill_by_unit!(BatchHopWindow, core, "BatchHopWindow");
61
62impl PlanTreeNodeUnary<Batch> for BatchHopWindow {
63 fn input(&self) -> PlanRef {
64 self.core.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 let mut core = self.core.clone();
69 core.input = input;
70 Self::new(
71 core,
72 self.window_start_exprs.clone(),
73 self.window_end_exprs.clone(),
74 )
75 }
76}
77
78impl_plan_tree_node_for_unary! { Batch, BatchHopWindow }
79
80impl ToDistributedBatch for BatchHopWindow {
81 fn to_distributed(&self) -> Result<PlanRef> {
82 self.to_distributed_with_required(&Order::any(), &RequiredDist::Any)
83 }
84
85 fn to_distributed_with_required(
86 &self,
87 required_order: &Order,
88 required_dist: &RequiredDist,
89 ) -> Result<PlanRef> {
90 let input_required = self
95 .core
96 .o2i_col_mapping()
97 .rewrite_required_distribution(required_dist);
98 let new_input = self
99 .input()
100 .to_distributed_with_required(required_order, &input_required)?;
101 let mut new_logical = self.core.clone();
102 new_logical.input = new_input;
103 let batch_plan = BatchHopWindow::new(
104 new_logical,
105 self.window_start_exprs.clone(),
106 self.window_end_exprs.clone(),
107 );
108 let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?;
109 required_dist.batch_enforce_if_not_satisfies(batch_plan, required_order)
110 }
111}
112
113impl ToBatchPb for BatchHopWindow {
114 fn to_batch_prost_body(&self) -> NodeBody {
115 NodeBody::HopWindow(HopWindowNode {
116 time_col: self.core.time_col.index() as _,
117 window_slide: Some(self.core.window_slide.into()),
118 window_size: Some(self.core.window_size.into()),
119 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
120 window_start_exprs: self
121 .window_start_exprs
122 .clone()
123 .iter()
124 .map(|x| x.to_expr_proto())
125 .collect(),
126 window_end_exprs: self
127 .window_end_exprs
128 .clone()
129 .iter()
130 .map(|x| x.to_expr_proto())
131 .collect(),
132 })
133 }
134}
135
136impl ToLocalBatch for BatchHopWindow {
137 fn to_local(&self) -> Result<PlanRef> {
138 let new_input = self.input().to_local()?;
139 Ok(self.clone_with_input(new_input).into())
140 }
141}
142
143impl ExprRewritable<Batch> for BatchHopWindow {
144 fn has_rewritable_expr(&self) -> bool {
145 true
146 }
147
148 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
149 Self::new(
150 self.core.clone(),
151 self.window_start_exprs
152 .clone()
153 .into_iter()
154 .map(|e| r.rewrite_expr(e))
155 .collect(),
156 self.window_end_exprs
157 .clone()
158 .into_iter()
159 .map(|e| r.rewrite_expr(e))
160 .collect(),
161 )
162 .into()
163 }
164}
165
166impl ExprVisitable for BatchHopWindow {
167 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
168 self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
169 self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
170 }
171}