risingwave_frontend/optimizer/plan_node/generic/
hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, Interval};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_expr::ExprError;
23
24use super::super::utils::IndicesDisplay;
25use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::plan_node::batch::BatchPlanRef;
30use crate::optimizer::property::{FunctionalDependencySet, Order};
31use crate::utils::ColIndexMappingRewriteExt;
32
33/// [`HopWindow`] implements Hop Table Function.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct HopWindow<PlanRef> {
36    pub input: PlanRef,
37    pub time_col: InputRef,
38    pub window_slide: Interval,
39    pub window_size: Interval,
40    pub window_offset: Interval,
41    /// Provides mapping from input schema, `window_start`, `window_end` to output schema.
42    /// For example, if we had:
43    /// input schema: | 0: `trip_time` | 1: `trip_name` |
44    /// `window_start`: 2
45    /// `window_end`: 3
46    /// output schema: | `trip_name` | `window_start` |
47    /// Then, `output_indices`: [1, 2]
48    pub output_indices: Vec<usize>,
49}
50
51impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
52    fn schema(&self) -> Schema {
53        let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
54        let mut original_schema = self.input.schema().clone();
55        original_schema.fields.reserve_exact(2);
56        let window_start = Field::with_name(output_type.clone(), "window_start");
57        let window_end = Field::with_name(output_type, "window_end");
58        original_schema.fields.push(window_start);
59        original_schema.fields.push(window_end);
60        self.output_indices
61            .iter()
62            .map(|&idx| original_schema[idx].clone())
63            .collect()
64    }
65
66    fn stream_key(&self) -> Option<Vec<usize>> {
67        let window_start_index = self
68            .output_indices
69            .iter()
70            .position(|&idx| idx == self.input.schema().len());
71        let window_end_index = self
72            .output_indices
73            .iter()
74            .position(|&idx| idx == self.input.schema().len() + 1);
75        if window_start_index.is_none() && window_end_index.is_none() {
76            None
77        } else {
78            let mut pk = self
79                .input
80                .stream_key()?
81                .iter()
82                .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
83                .collect_vec();
84            if let Some(start_idx) = window_start_index {
85                pk.push(start_idx);
86            };
87            if let Some(end_idx) = window_end_index {
88                pk.push(end_idx);
89            };
90            Some(pk)
91        }
92    }
93
94    fn ctx(&self) -> OptimizerContextRef {
95        self.input.ctx()
96    }
97
98    fn functional_dependency(&self) -> FunctionalDependencySet {
99        let mut fd_set = self
100            .i2o_col_mapping()
101            .rewrite_functional_dependency_set(self.input.functional_dependency().clone());
102        let (start_idx_in_output, end_idx_in_output) = {
103            let internal2output = self.internal2output_col_mapping();
104            (
105                internal2output.try_map(self.internal_window_start_col_idx()),
106                internal2output.try_map(self.internal_window_end_col_idx()),
107            )
108        };
109        if let Some(start_idx) = start_idx_in_output
110            && let Some(end_idx) = end_idx_in_output
111        {
112            fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
113            fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
114        }
115        fd_set
116    }
117}
118
119impl<PlanRef: BatchPlanRef> HopWindow<PlanRef> {
120    pub fn get_out_column_index_order(&self) -> Order {
121        self.i2o_col_mapping()
122            .rewrite_provided_order(self.input.order())
123    }
124}
125
126impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
127    pub fn output_window_start_col_idx(&self) -> Option<usize> {
128        self.internal2output_col_mapping()
129            .try_map(self.internal_window_start_col_idx())
130    }
131
132    pub fn output_window_end_col_idx(&self) -> Option<usize> {
133        self.internal2output_col_mapping()
134            .try_map(self.internal_window_end_col_idx())
135    }
136
137    pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
138        (
139            self.input,
140            self.time_col,
141            self.window_slide,
142            self.window_size,
143            self.window_offset,
144            self.output_indices,
145        )
146    }
147
148    pub fn internal_window_start_col_idx(&self) -> usize {
149        self.input.schema().len()
150    }
151
152    pub fn internal_window_end_col_idx(&self) -> usize {
153        self.input.schema().len() + 1
154    }
155
156    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
157        self.output2internal_col_mapping()
158            .composite(&self.internal2input_col_mapping())
159    }
160
161    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
162        self.input2internal_col_mapping()
163            .composite(&self.internal2output_col_mapping())
164    }
165
166    pub fn internal_column_num(&self) -> usize {
167        self.input.schema().len() + 2
168    }
169
170    pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
171        self.internal2output_col_mapping()
172            .inverse()
173            .expect("must be invertible")
174    }
175
176    pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
177        ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
178    }
179
180    pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
181        ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
182    }
183
184    pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
185        ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
186    }
187
188    pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
189        let Self {
190            window_size,
191            window_slide,
192            window_offset,
193            time_col,
194            ..
195        } = &self;
196        let units = window_size
197            .exact_div(window_slide)
198            .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
199            .ok_or_else(|| ExprError::InvalidParam {
200                name: "window",
201                reason: format!(
202                    "window_size {} cannot be divided by window_slide {}",
203                    window_size, window_slide
204                )
205                .into(),
206            })?
207            .get();
208        let window_size_expr: ExprImpl =
209            Literal::new(Some((*window_size).into()), DataType::Interval).into();
210        let window_slide_expr: ExprImpl =
211            Literal::new(Some((*window_slide).into()), DataType::Interval).into();
212        let window_offset_expr: ExprImpl =
213            Literal::new(Some((*window_offset).into()), DataType::Interval).into();
214
215        let window_size_sub_slide = FunctionCall::new(
216            ExprType::Subtract,
217            vec![window_size_expr, window_slide_expr.clone()],
218        )?
219        .into();
220
221        let time_col_shifted = FunctionCall::new(
222            ExprType::Subtract,
223            vec![
224                ExprImpl::InputRef(Box::new(time_col.clone())),
225                window_size_sub_slide,
226            ],
227        )?
228        .into();
229
230        let hop_start: ExprImpl = FunctionCall::new(
231            ExprType::TumbleStart,
232            vec![time_col_shifted, window_slide_expr, window_offset_expr],
233        )?
234        .into();
235
236        let mut window_start_exprs = Vec::with_capacity(units);
237        let mut window_end_exprs = Vec::with_capacity(units);
238        for i in 0..units {
239            {
240                let window_start_offset =
241                    window_slide
242                        .checked_mul_int(i)
243                        .ok_or_else(|| ExprError::InvalidParam {
244                            name: "window",
245                            reason: format!(
246                                "window_slide {} cannot be multiplied by {}",
247                                window_slide, i
248                            )
249                            .into(),
250                        })?;
251                let window_start_offset_expr =
252                    Literal::new(Some(window_start_offset.into()), DataType::Interval).into();
253                let window_start_expr = FunctionCall::new(
254                    ExprType::Add,
255                    vec![hop_start.clone(), window_start_offset_expr],
256                )?
257                .into();
258                window_start_exprs.push(window_start_expr);
259            }
260            {
261                let window_end_offset =
262                    window_slide.checked_mul_int(i + units).ok_or_else(|| {
263                        ExprError::InvalidParam {
264                            name: "window",
265                            reason: format!(
266                                "window_slide {} cannot be multiplied by {}",
267                                window_slide,
268                                i + units
269                            )
270                            .into(),
271                        }
272                    })?;
273                let window_end_offset_expr =
274                    Literal::new(Some(window_end_offset.into()), DataType::Interval).into();
275                let window_end_expr = FunctionCall::new(
276                    ExprType::Add,
277                    vec![hop_start.clone(), window_end_offset_expr],
278                )?
279                .into();
280                window_end_exprs.push(window_end_expr);
281            }
282        }
283        assert_eq!(window_start_exprs.len(), window_end_exprs.len());
284        Ok((window_start_exprs, window_end_exprs))
285    }
286
287    pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
288        let mut out = Vec::with_capacity(5);
289        let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
290        out.push((
291            "time_col",
292            Pretty::display(&InputRefDisplay {
293                input_ref: &self.time_col,
294                input_schema: self.input.schema(),
295            }),
296        ));
297        out.push(("slide", Pretty::display(&self.window_slide)));
298        out.push(("size", Pretty::display(&self.window_size)));
299        if self
300            .output_indices
301            .iter()
302            .copied()
303            // Behavior is the same as `LogicalHopWindow::internal_column_num`
304            .eq(0..(self.input.schema().len() + 2))
305        {
306            out.push(("output", Pretty::from("all")));
307        } else {
308            let original_schema: Schema = self
309                .input
310                .schema()
311                .clone()
312                .into_fields()
313                .into_iter()
314                .chain([
315                    Field::with_name(output_type.clone(), "window_start"),
316                    Field::with_name(output_type, "window_end"),
317                ])
318                .collect();
319            let id = IndicesDisplay {
320                indices: &self.output_indices,
321                schema: &original_schema,
322            };
323            out.push(("output", id.distill()));
324        }
325        out
326    }
327}
328
329impl_distill_unit_from_fields!(HopWindow, GenericPlanRef);