risingwave_frontend/optimizer/plan_node/
batch_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::batch_plan::HopWindowNode;
16use risingwave_pb::batch_plan::plan_node::NodeBody;
17
18use super::batch::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, generic,
22};
23use crate::error::Result;
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::ToLocalBatch;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Order, RequiredDist};
28use crate::utils::ColIndexMappingRewriteExt;
29
30/// `BatchHopWindow` implements [`super::LogicalHopWindow`] to evaluate specified expressions on
31/// input rows
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchHopWindow {
34    pub base: PlanBase<Batch>,
35    core: generic::HopWindow<PlanRef>,
36    window_start_exprs: Vec<ExprImpl>,
37    window_end_exprs: Vec<ExprImpl>,
38}
39
40impl BatchHopWindow {
41    pub fn new(
42        core: generic::HopWindow<PlanRef>,
43        window_start_exprs: Vec<ExprImpl>,
44        window_end_exprs: Vec<ExprImpl>,
45    ) -> Self {
46        let distribution = core
47            .i2o_col_mapping()
48            .rewrite_provided_distribution(core.input.distribution());
49        let base =
50            PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order());
51        BatchHopWindow {
52            base,
53            core,
54            window_start_exprs,
55            window_end_exprs,
56        }
57    }
58}
59impl_distill_by_unit!(BatchHopWindow, core, "BatchHopWindow");
60
61impl PlanTreeNodeUnary for BatchHopWindow {
62    fn input(&self) -> PlanRef {
63        self.core.input.clone()
64    }
65
66    fn clone_with_input(&self, input: PlanRef) -> Self {
67        let mut core = self.core.clone();
68        core.input = input;
69        Self::new(
70            core,
71            self.window_start_exprs.clone(),
72            self.window_end_exprs.clone(),
73        )
74    }
75}
76
77impl_plan_tree_node_for_unary! { BatchHopWindow }
78
79impl ToDistributedBatch for BatchHopWindow {
80    fn to_distributed(&self) -> Result<PlanRef> {
81        self.to_distributed_with_required(&Order::any(), &RequiredDist::Any)
82    }
83
84    fn to_distributed_with_required(
85        &self,
86        required_order: &Order,
87        required_dist: &RequiredDist,
88    ) -> Result<PlanRef> {
89        // The hop operator will generate a multiplication of its input rows,
90        // so shuffling its input instead of its output will reduce the shuffling data
91        // communication.
92        // We pass the required dist to its input.
93        let input_required = self
94            .core
95            .o2i_col_mapping()
96            .rewrite_required_distribution(required_dist);
97        let new_input = self
98            .input()
99            .to_distributed_with_required(required_order, &input_required)?;
100        let mut new_logical = self.core.clone();
101        new_logical.input = new_input;
102        let batch_plan = BatchHopWindow::new(
103            new_logical,
104            self.window_start_exprs.clone(),
105            self.window_end_exprs.clone(),
106        );
107        let batch_plan = required_order.enforce_if_not_satisfies(batch_plan.into())?;
108        required_dist.enforce_if_not_satisfies(batch_plan, required_order)
109    }
110}
111
112impl ToBatchPb for BatchHopWindow {
113    fn to_batch_prost_body(&self) -> NodeBody {
114        NodeBody::HopWindow(HopWindowNode {
115            time_col: self.core.time_col.index() as _,
116            window_slide: Some(self.core.window_slide.into()),
117            window_size: Some(self.core.window_size.into()),
118            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
119            window_start_exprs: self
120                .window_start_exprs
121                .clone()
122                .iter()
123                .map(|x| x.to_expr_proto())
124                .collect(),
125            window_end_exprs: self
126                .window_end_exprs
127                .clone()
128                .iter()
129                .map(|x| x.to_expr_proto())
130                .collect(),
131        })
132    }
133}
134
135impl ToLocalBatch for BatchHopWindow {
136    fn to_local(&self) -> Result<PlanRef> {
137        let new_input = self.input().to_local()?;
138        Ok(self.clone_with_input(new_input).into())
139    }
140}
141
142impl ExprRewritable for BatchHopWindow {
143    fn has_rewritable_expr(&self) -> bool {
144        true
145    }
146
147    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
148        Self::new(
149            self.core.clone(),
150            self.window_start_exprs
151                .clone()
152                .into_iter()
153                .map(|e| r.rewrite_expr(e))
154                .collect(),
155            self.window_end_exprs
156                .clone()
157                .into_iter()
158                .map(|e| r.rewrite_expr(e))
159                .collect(),
160        )
161        .into()
162    }
163}
164
165impl ExprVisitable for BatchHopWindow {
166    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
167        self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
168        self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
169    }
170}