risingwave_frontend/optimizer/plan_node/
batch_hop_window.rs
1use risingwave_pb::batch_plan::HopWindowNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::ToLocalBatch;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Order, RequiredDist};
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchHopWindow {
34 pub base: PlanBase<Batch>,
35 core: generic::HopWindow<PlanRef>,
36 window_start_exprs: Vec<ExprImpl>,
37 window_end_exprs: Vec<ExprImpl>,
38}
39
40impl BatchHopWindow {
41 pub fn new(
42 core: generic::HopWindow<PlanRef>,
43 window_start_exprs: Vec<ExprImpl>,
44 window_end_exprs: Vec<ExprImpl>,
45 ) -> Self {
46 let distribution = core
47 .i2o_col_mapping()
48 .rewrite_provided_distribution(core.input.distribution());
49 let base =
50 PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
51 BatchHopWindow {
52 base,
53 core,
54 window_start_exprs,
55 window_end_exprs,
56 }
57 }
58}
59impl_distill_by_unit!(BatchHopWindow, core, "BatchHopWindow");
60
61impl PlanTreeNodeUnary for BatchHopWindow {
62 fn input(&self) -> PlanRef {
63 self.core.input.clone()
64 }
65
66 fn clone_with_input(&self, input: PlanRef) -> Self {
67 let mut core = self.core.clone();
68 core.input = input;
69 Self::new(
70 core,
71 self.window_start_exprs.clone(),
72 self.window_end_exprs.clone(),
73 )
74 }
75}
76
77impl_plan_tree_node_for_unary! { BatchHopWindow }
78
79impl ToDistributedBatch for BatchHopWindow {
80 fn to_distributed(&self) -> Result<PlanRef> {
81 self.to_distributed_with_required(&Order::any(), &RequiredDist::Any)
82 }
83
84 fn to_distributed_with_required(
85 &self,
86 required_order: &Order,
87 required_dist: &RequiredDist,
88 ) -> Result<PlanRef> {
89 let input_required = self
94 .core
95 .o2i_col_mapping()
96 .rewrite_required_distribution(required_dist);
97 let new_input = self
98 .input()
99 .to_distributed_with_required(required_order, &input_required)?;
100 let mut new_logical = self.core.clone();
101 new_logical.input = new_input;
102 let batch_plan = BatchHopWindow::new(
103 new_logical,
104 self.window_start_exprs.clone(),
105 self.window_end_exprs.clone(),
106 );
107 let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?;
108 required_dist.enforce_if_not_satisfies(batch_plan, required_order)
109 }
110}
111
112impl ToBatchPb for BatchHopWindow {
113 fn to_batch_prost_body(&self) -> NodeBody {
114 NodeBody::HopWindow(HopWindowNode {
115 time_col: self.core.time_col.index() as _,
116 window_slide: Some(self.core.window_slide.into()),
117 window_size: Some(self.core.window_size.into()),
118 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
119 window_start_exprs: self
120 .window_start_exprs
121 .clone()
122 .iter()
123 .map(|x| x.to_expr_proto())
124 .collect(),
125 window_end_exprs: self
126 .window_end_exprs
127 .clone()
128 .iter()
129 .map(|x| x.to_expr_proto())
130 .collect(),
131 })
132 }
133}
134
135impl ToLocalBatch for BatchHopWindow {
136 fn to_local(&self) -> Result<PlanRef> {
137 let new_input = self.input().to_local()?;
138 Ok(self.clone_with_input(new_input).into())
139 }
140}
141
142impl ExprRewritable for BatchHopWindow {
143 fn has_rewritable_expr(&self) -> bool {
144 true
145 }
146
147 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
148 Self::new(
149 self.core.clone(),
150 self.window_start_exprs
151 .clone()
152 .into_iter()
153 .map(|e| r.rewrite_expr(e))
154 .collect(),
155 self.window_end_exprs
156 .clone()
157 .into_iter()
158 .map(|e| r.rewrite_expr(e))
159 .collect(),
160 )
161 .into()
162 }
163}
164
165impl ExprVisitable for BatchHopWindow {
166 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
167 self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
168 self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
169 }
170}