risingwave_frontend/optimizer/plan_node/generic/
hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, Interval};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_expr::ExprError;
23
24use super::super::utils::IndicesDisplay;
25use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::plan_node::batch::BatchPlanNodeMetadata;
30use crate::optimizer::property::{FunctionalDependencySet, Order};
31use crate::utils::ColIndexMappingRewriteExt;
32
33/// [`HopWindow`] implements Hop Table Function.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct HopWindow<PlanRef> {
36    pub input: PlanRef,
37    pub time_col: InputRef,
38    pub window_slide: Interval,
39    pub window_size: Interval,
40    pub window_offset: Interval,
41    /// Provides mapping from input schema, `window_start`, `window_end` to output schema.
42    /// For example, if we had:
43    /// input schema: | 0: `trip_time` | 1: `trip_name` |
44    /// `window_start`: 2
45    /// `window_end`: 3
46    /// output schema: | `trip_name` | `window_start` |
47    /// Then, `output_indices`: [1, 2]
48    pub output_indices: Vec<usize>,
49}
50
51impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
52    fn schema(&self) -> Schema {
53        let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
54        let mut original_schema = self.input.schema().clone();
55        original_schema.fields.reserve_exact(2);
56        let window_start = Field::with_name(output_type.clone(), "window_start");
57        let window_end = Field::with_name(output_type, "window_end");
58        original_schema.fields.push(window_start);
59        original_schema.fields.push(window_end);
60        self.output_indices
61            .iter()
62            .map(|&idx| original_schema[idx].clone())
63            .collect()
64    }
65
66    fn stream_key(&self) -> Option<Vec<usize>> {
67        let window_start_index = self
68            .output_indices
69            .iter()
70            .position(|&idx| idx == self.input.schema().len());
71        let window_end_index = self
72            .output_indices
73            .iter()
74            .position(|&idx| idx == self.input.schema().len() + 1);
75        if window_start_index.is_none() && window_end_index.is_none() {
76            None
77        } else {
78            let mut pk = self
79                .input
80                .stream_key()?
81                .iter()
82                .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
83                .collect_vec();
84            if let Some(start_idx) = window_start_index {
85                pk.push(start_idx);
86            };
87            if let Some(end_idx) = window_end_index {
88                pk.push(end_idx);
89            };
90            Some(pk)
91        }
92    }
93
94    fn ctx(&self) -> OptimizerContextRef {
95        self.input.ctx()
96    }
97
98    fn functional_dependency(&self) -> FunctionalDependencySet {
99        let mut fd_set = self
100            .i2o_col_mapping()
101            .rewrite_functional_dependency_set(self.input.functional_dependency().clone());
102        let (start_idx_in_output, end_idx_in_output) = {
103            let internal2output = self.internal2output_col_mapping();
104            (
105                internal2output.try_map(self.internal_window_start_col_idx()),
106                internal2output.try_map(self.internal_window_end_col_idx()),
107            )
108        };
109        if let Some(start_idx) = start_idx_in_output
110            && let Some(end_idx) = end_idx_in_output
111        {
112            fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
113            fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
114        }
115        fd_set
116    }
117}
118
119impl<PlanRef: BatchPlanNodeMetadata> HopWindow<PlanRef> {
120    pub fn get_out_column_index_order(&self) -> Order {
121        self.i2o_col_mapping()
122            .rewrite_provided_order(self.input.order())
123    }
124}
125
126impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
127    pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> HopWindow<OtherPlanRef> {
128        HopWindow {
129            input,
130            time_col: self.time_col.clone(),
131            window_slide: self.window_slide,
132            window_size: self.window_size,
133            window_offset: self.window_offset,
134            output_indices: self.output_indices.clone(),
135        }
136    }
137
138    pub fn output_window_start_col_idx(&self) -> Option<usize> {
139        self.internal2output_col_mapping()
140            .try_map(self.internal_window_start_col_idx())
141    }
142
143    pub fn output_window_end_col_idx(&self) -> Option<usize> {
144        self.internal2output_col_mapping()
145            .try_map(self.internal_window_end_col_idx())
146    }
147
148    pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
149        (
150            self.input,
151            self.time_col,
152            self.window_slide,
153            self.window_size,
154            self.window_offset,
155            self.output_indices,
156        )
157    }
158
159    pub fn internal_window_start_col_idx(&self) -> usize {
160        self.input.schema().len()
161    }
162
163    pub fn internal_window_end_col_idx(&self) -> usize {
164        self.input.schema().len() + 1
165    }
166
167    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
168        self.output2internal_col_mapping()
169            .composite(&self.internal2input_col_mapping())
170    }
171
172    pub fn i2o_col_mapping(&self) -> ColIndexMapping {
173        self.input2internal_col_mapping()
174            .composite(&self.internal2output_col_mapping())
175    }
176
177    pub fn internal_column_num(&self) -> usize {
178        self.input.schema().len() + 2
179    }
180
181    pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
182        self.internal2output_col_mapping()
183            .inverse()
184            .expect("must be invertible")
185    }
186
187    pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
188        ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
189    }
190
191    pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
192        ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
193    }
194
195    pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
196        ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
197    }
198
199    pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
200        let Self {
201            window_size,
202            window_slide,
203            window_offset,
204            time_col,
205            ..
206        } = &self;
207        let units = window_size
208            .exact_div(window_slide)
209            .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
210            .ok_or_else(|| ExprError::InvalidParam {
211                name: "window",
212                reason: format!(
213                    "window_size {} cannot be divided by window_slide {}",
214                    window_size, window_slide
215                )
216                .into(),
217            })?
218            .get();
219        let window_size_expr: ExprImpl =
220            Literal::new(Some((*window_size).into()), DataType::Interval).into();
221        let window_slide_expr: ExprImpl =
222            Literal::new(Some((*window_slide).into()), DataType::Interval).into();
223        let window_offset_expr: ExprImpl =
224            Literal::new(Some((*window_offset).into()), DataType::Interval).into();
225
226        let window_size_sub_slide = FunctionCall::new(
227            ExprType::Subtract,
228            vec![window_size_expr, window_slide_expr.clone()],
229        )?
230        .into();
231
232        let time_col_shifted = FunctionCall::new(
233            ExprType::Subtract,
234            vec![
235                ExprImpl::InputRef(Box::new(time_col.clone())),
236                window_size_sub_slide,
237            ],
238        )?
239        .into();
240
241        let hop_start: ExprImpl = FunctionCall::new(
242            ExprType::TumbleStart,
243            vec![time_col_shifted, window_slide_expr, window_offset_expr],
244        )?
245        .into();
246
247        let mut window_start_exprs = Vec::with_capacity(units);
248        let mut window_end_exprs = Vec::with_capacity(units);
249        for i in 0..units {
250            {
251                let window_start_offset =
252                    window_slide
253                        .checked_mul_int(i)
254                        .ok_or_else(|| ExprError::InvalidParam {
255                            name: "window",
256                            reason: format!(
257                                "window_slide {} cannot be multiplied by {}",
258                                window_slide, i
259                            )
260                            .into(),
261                        })?;
262                let window_start_offset_expr =
263                    Literal::new(Some(window_start_offset.into()), DataType::Interval).into();
264                let window_start_expr = FunctionCall::new(
265                    ExprType::Add,
266                    vec![hop_start.clone(), window_start_offset_expr],
267                )?
268                .into();
269                window_start_exprs.push(window_start_expr);
270            }
271            {
272                let window_end_offset =
273                    window_slide.checked_mul_int(i + units).ok_or_else(|| {
274                        ExprError::InvalidParam {
275                            name: "window",
276                            reason: format!(
277                                "window_slide {} cannot be multiplied by {}",
278                                window_slide,
279                                i + units
280                            )
281                            .into(),
282                        }
283                    })?;
284                let window_end_offset_expr =
285                    Literal::new(Some(window_end_offset.into()), DataType::Interval).into();
286                let window_end_expr = FunctionCall::new(
287                    ExprType::Add,
288                    vec![hop_start.clone(), window_end_offset_expr],
289                )?
290                .into();
291                window_end_exprs.push(window_end_expr);
292            }
293        }
294        assert_eq!(window_start_exprs.len(), window_end_exprs.len());
295        Ok((window_start_exprs, window_end_exprs))
296    }
297
298    pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
299        let mut out = Vec::with_capacity(5);
300        let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
301        out.push((
302            "time_col",
303            Pretty::display(&InputRefDisplay {
304                input_ref: &self.time_col,
305                input_schema: self.input.schema(),
306            }),
307        ));
308        out.push(("slide", Pretty::display(&self.window_slide)));
309        out.push(("size", Pretty::display(&self.window_size)));
310        if self
311            .output_indices
312            .iter()
313            .copied()
314            // Behavior is the same as `LogicalHopWindow::internal_column_num`
315            .eq(0..(self.input.schema().len() + 2))
316        {
317            out.push(("output", Pretty::from("all")));
318        } else {
319            let original_schema: Schema = self
320                .input
321                .schema()
322                .clone()
323                .into_fields()
324                .into_iter()
325                .chain([
326                    Field::with_name(output_type.clone(), "window_start"),
327                    Field::with_name(output_type, "window_end"),
328                ])
329                .collect();
330            let id = IndicesDisplay {
331                indices: &self.output_indices,
332                schema: &original_schema,
333            };
334            out.push(("output", id.distill()));
335        }
336        out
337    }
338}
339
340impl_distill_unit_from_fields!(HopWindow, GenericPlanRef);