risingwave_frontend/optimizer/plan_node/
batch_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::HopWindowNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
22    ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29use crate::utils::ColIndexMappingRewriteExt;
30
31/// `BatchHopWindow` implements [`super::LogicalHopWindow`] to evaluate specified expressions on
32/// input rows
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct BatchHopWindow {
35    pub base: PlanBase<Batch>,
36    core: generic::HopWindow<PlanRef>,
37    window_start_exprs: Vec<ExprImpl>,
38    window_end_exprs: Vec<ExprImpl>,
39}
40
41impl BatchHopWindow {
42    pub fn new(
43        core: generic::HopWindow<PlanRef>,
44        window_start_exprs: Vec<ExprImpl>,
45        window_end_exprs: Vec<ExprImpl>,
46    ) -> Self {
47        let distribution = core
48            .i2o_col_mapping()
49            .rewrite_provided_distribution(core.input.distribution());
50        let base =
51            PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
52        BatchHopWindow {
53            base,
54            core,
55            window_start_exprs,
56            window_end_exprs,
57        }
58    }
59}
60impl_distill_by_unit!(BatchHopWindow, core, "BatchHopWindow");
61
62impl PlanTreeNodeUnary<Batch> for BatchHopWindow {
63    fn input(&self) -> PlanRef {
64        self.core.input.clone()
65    }
66
67    fn clone_with_input(&self, input: PlanRef) -> Self {
68        let mut core = self.core.clone();
69        core.input = input;
70        Self::new(
71            core,
72            self.window_start_exprs.clone(),
73            self.window_end_exprs.clone(),
74        )
75    }
76}
77
78impl_plan_tree_node_for_unary! { Batch, BatchHopWindow }
79
80impl ToDistributedBatch for BatchHopWindow {
81    fn to_distributed(&self) -> Result<PlanRef> {
82        self.to_distributed_with_required(&Order::any(), &RequiredDist::Any)
83    }
84
85    fn to_distributed_with_required(
86        &self,
87        required_order: &Order,
88        required_dist: &RequiredDist,
89    ) -> Result<PlanRef> {
90        // The hop operator will generate a multiplication of its input rows,
91        // so shuffling its input instead of its output will reduce the shuffling data
92        // communication.
93        // We pass the required dist to its input.
94        let input_required = self
95            .core
96            .o2i_col_mapping()
97            .rewrite_required_distribution(required_dist);
98        let new_input = self
99            .input()
100            .to_distributed_with_required(required_order, &input_required)?;
101        let mut new_logical = self.core.clone();
102        new_logical.input = new_input;
103        let batch_plan = BatchHopWindow::new(
104            new_logical,
105            self.window_start_exprs.clone(),
106            self.window_end_exprs.clone(),
107        );
108        let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?;
109        required_dist.batch_enforce_if_not_satisfies(batch_plan, required_order)
110    }
111}
112
113impl ToBatchPb for BatchHopWindow {
114    fn to_batch_prost_body(&self) -> NodeBody {
115        NodeBody::HopWindow(HopWindowNode {
116            time_col: self.core.time_col.index() as _,
117            window_slide: Some(self.core.window_slide.into()),
118            window_size: Some(self.core.window_size.into()),
119            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
120            window_start_exprs: self
121                .window_start_exprs
122                .clone()
123                .iter()
124                .map(|x| x.to_expr_proto())
125                .collect(),
126            window_end_exprs: self
127                .window_end_exprs
128                .clone()
129                .iter()
130                .map(|x| x.to_expr_proto())
131                .collect(),
132        })
133    }
134}
135
136impl ToLocalBatch for BatchHopWindow {
137    fn to_local(&self) -> Result<PlanRef> {
138        let new_input = self.input().to_local()?;
139        Ok(self.clone_with_input(new_input).into())
140    }
141}
142
143impl ExprRewritable<Batch> for BatchHopWindow {
144    fn has_rewritable_expr(&self) -> bool {
145        true
146    }
147
148    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
149        Self::new(
150            self.core.clone(),
151            self.window_start_exprs
152                .clone()
153                .into_iter()
154                .map(|e| r.rewrite_expr(e))
155                .collect(),
156            self.window_end_exprs
157                .clone()
158                .into_iter()
159                .map(|e| r.rewrite_expr(e))
160                .collect(),
161        )
162        .into()
163    }
164}
165
166impl ExprVisitable for BatchHopWindow {
167    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
168        self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
169        self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
170    }
171}