risingwave_frontend/optimizer/plan_node/
logical_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::Interval;
18
19use super::generic::{GenericPlanNode, GenericPlanRef};
20use super::utils::impl_distill_by_unit;
21use super::{
22    BatchHopWindow, ColPrunable, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef,
23    PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream,
24    gen_filter_and_pushdown, generic,
25};
26use crate::error::Result;
27use crate::expr::{ExprType, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{
30    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
31};
32use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
33
34/// `LogicalHopWindow` implements Hop Table Function.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalHopWindow {
37    pub base: PlanBase<Logical>,
38    core: generic::HopWindow<PlanRef>,
39}
40
41impl LogicalHopWindow {
42    /// Hop windows will add `windows_start` and `windows_end` columns at the end.
43    /// Take care to modify the code referring it if above rule changes.
44    pub const ADDITION_COLUMN_LEN: usize = 2;
45
46    /// just used in optimizer and the function will not check if the `time_col`'s value is NULL
47    /// compared with `LogicalHopWindow::create`
48    fn new(
49        input: PlanRef,
50        time_col: InputRef,
51        window_slide: Interval,
52        window_size: Interval,
53        window_offset: Interval,
54        output_indices: Option<Vec<usize>>,
55    ) -> Self {
56        // if output_indices is not specified, use default output_indices
57        let output_indices =
58            output_indices.unwrap_or_else(|| (0..input.schema().len() + 2).collect_vec());
59        let core = generic::HopWindow {
60            input,
61            time_col,
62            window_slide,
63            window_size,
64            window_offset,
65            output_indices,
66        };
67
68        let ctx = core.ctx();
69
70        let base = PlanBase::new_logical(
71            ctx,
72            core.schema(),
73            core.stream_key(),
74            core.functional_dependency(),
75        );
76
77        LogicalHopWindow { base, core }
78    }
79
80    pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
81        self.core.into_parts()
82    }
83
84    pub fn output_indices_are_trivial(&self) -> bool {
85        self.output_indices() == &(0..self.core.internal_column_num()).collect_vec()
86    }
87
88    /// used for binder and planner. The function will add a filter operator to ignore records with
89    /// NULL time value. <https://github.com/risingwavelabs/risingwave/issues/8130>
90    pub fn create(
91        input: PlanRef,
92        time_col: InputRef,
93        window_slide: Interval,
94        window_size: Interval,
95        window_offset: Interval,
96    ) -> PlanRef {
97        let input = LogicalFilter::create_with_expr(
98            input,
99            FunctionCall::new(ExprType::IsNotNull, vec![time_col.clone().into()])
100                .unwrap()
101                .into(),
102        );
103        Self::new(
104            input,
105            time_col,
106            window_slide,
107            window_size,
108            window_offset,
109            None,
110        )
111        .into()
112    }
113
114    pub fn output_window_start_col_idx(&self) -> Option<usize> {
115        self.core.output_window_start_col_idx()
116    }
117
118    pub fn output_window_end_col_idx(&self) -> Option<usize> {
119        self.core.output_window_end_col_idx()
120    }
121
122    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
123        self.core.o2i_col_mapping()
124    }
125
126    pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
127        self.core.output2internal_col_mapping()
128    }
129
130    pub fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
131        Self::new(
132            self.input(),
133            self.core.time_col.clone(),
134            self.core.window_slide,
135            self.core.window_size,
136            self.core.window_offset,
137            Some(output_indices),
138        )
139    }
140
141    /// Get output indices
142    pub fn output_indices(&self) -> &Vec<usize> {
143        &self.core.output_indices
144    }
145}
146
147impl PlanTreeNodeUnary for LogicalHopWindow {
148    fn input(&self) -> PlanRef {
149        self.core.input.clone()
150    }
151
152    fn clone_with_input(&self, input: PlanRef) -> Self {
153        Self::new(
154            input,
155            self.core.time_col.clone(),
156            self.core.window_slide,
157            self.core.window_size,
158            self.core.window_offset,
159            Some(self.core.output_indices.clone()),
160        )
161    }
162
163    fn rewrite_with_input(
164        &self,
165        input: PlanRef,
166        input_col_change: ColIndexMapping,
167    ) -> (Self, ColIndexMapping) {
168        let mut time_col = self.core.time_col.clone();
169        time_col.index = input_col_change.map(time_col.index);
170        let mut columns_to_be_kept = Vec::new();
171        let new_output_indices = self
172            .core
173            .output_indices
174            .iter()
175            .enumerate()
176            .filter_map(|(i, &idx)| match input_col_change.try_map(idx) {
177                Some(new_idx) => {
178                    columns_to_be_kept.push(i);
179                    Some(new_idx)
180                }
181                None => {
182                    if idx == self.core.internal_window_start_col_idx() {
183                        columns_to_be_kept.push(i);
184                        Some(input.schema().len())
185                    } else if idx == self.core.internal_window_end_col_idx() {
186                        columns_to_be_kept.push(i);
187                        Some(input.schema().len() + 1)
188                    } else {
189                        None
190                    }
191                }
192            })
193            .collect_vec();
194        let new_hop = Self::new(
195            input,
196            time_col,
197            self.core.window_slide,
198            self.core.window_size,
199            self.core.window_offset,
200            Some(new_output_indices),
201        );
202        (
203            new_hop,
204            ColIndexMapping::with_remaining_columns(
205                &columns_to_be_kept,
206                self.core.output_indices.len(),
207            ),
208        )
209    }
210}
211
212impl_plan_tree_node_for_unary! {LogicalHopWindow}
213impl_distill_by_unit!(LogicalHopWindow, core, "LogicalHopWindow");
214
215impl ColPrunable for LogicalHopWindow {
216    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
217        let o2i = self.o2i_col_mapping();
218        let input_required_cols = {
219            let mut tmp = FixedBitSet::with_capacity(self.schema().len());
220            tmp.extend(required_cols.iter().copied());
221            tmp = o2i.rewrite_bitset(&tmp);
222            // LogicalHopWindow should keep all required cols from upstream,
223            // as well as its own time_col.
224            tmp.put(self.core.time_col.index());
225            tmp.ones().collect_vec()
226        };
227        let input = self.input().prune_col(&input_required_cols, ctx);
228        let input_change = ColIndexMapping::with_remaining_columns(
229            &input_required_cols,
230            self.input().schema().len(),
231        );
232        let (new_hop, _) = self.rewrite_with_input(input, input_change.clone());
233        let output_cols = {
234            // output cols = { cols required by upstream from input node } ∪ { additional window
235            // cols }
236            #[derive(Copy, Clone, Debug)]
237            enum IndexType {
238                Input(usize),
239                WindowStart,
240                WindowEnd,
241            }
242            let output2internal = self.output2internal_col_mapping();
243            // map the indices from output to input
244            let input_required_cols = required_cols
245                .iter()
246                .filter_map(|&idx| {
247                    if let Some(idx) = o2i.try_map(idx) {
248                        Some(IndexType::Input(idx))
249                    } else if let Some(idx) = output2internal.try_map(idx) {
250                        if idx == self.core.internal_window_start_col_idx() {
251                            Some(IndexType::WindowStart)
252                        } else if idx == self.core.internal_window_end_col_idx() {
253                            Some(IndexType::WindowEnd)
254                        } else {
255                            None
256                        }
257                    } else {
258                        None
259                    }
260                })
261                .collect_vec();
262            // this mapping will only keeps required columns
263            input_required_cols
264                .iter()
265                .filter_map(|&idx| match idx {
266                    IndexType::Input(x) => input_change.try_map(x),
267                    IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()),
268                    IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()),
269                })
270                .collect_vec()
271        };
272        new_hop.clone_with_output_indices(output_cols).into()
273    }
274}
275
276impl ExprRewritable for LogicalHopWindow {}
277
278impl ExprVisitable for LogicalHopWindow {}
279
280impl PredicatePushdown for LogicalHopWindow {
281    /// Keep predicate on time window parameters (`window_start`, `window_end`),
282    /// the rest may be pushed-down.
283    fn predicate_pushdown(
284        &self,
285        predicate: Condition,
286        ctx: &mut PredicatePushdownContext,
287    ) -> PlanRef {
288        let mut window_columns = FixedBitSet::with_capacity(self.schema().len());
289
290        let window_start_idx = self.core.internal_window_start_col_idx();
291        let window_end_idx = self.core.internal_window_end_col_idx();
292        for (i, v) in self.output_indices().iter().enumerate() {
293            if *v == window_start_idx || *v == window_end_idx {
294                window_columns.insert(i);
295            }
296        }
297        let (time_window_pred, pushed_predicate) = predicate.split_disjoint(&window_columns);
298        let mut mapping = self.o2i_col_mapping();
299        let pushed_predicate = pushed_predicate.rewrite_expr(&mut mapping);
300        gen_filter_and_pushdown(self, time_window_pred, pushed_predicate, ctx)
301    }
302}
303
304impl ToBatch for LogicalHopWindow {
305    fn to_batch(&self) -> Result<PlanRef> {
306        let new_input = self.input().to_batch()?;
307        let mut new_logical = self.core.clone();
308        new_logical.input = new_input;
309        let (window_start_exprs, window_end_exprs) =
310            new_logical.derive_window_start_and_end_exprs()?;
311        Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
312    }
313}
314
315impl ToStream for LogicalHopWindow {
316    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
317        let new_input = self.input().to_stream(ctx)?;
318        let mut new_logical = self.core.clone();
319        new_logical.input = new_input;
320        let (window_start_exprs, window_end_exprs) =
321            new_logical.derive_window_start_and_end_exprs()?;
322        Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into())
323    }
324
325    fn logical_rewrite_for_stream(
326        &self,
327        ctx: &mut RewriteStreamContext,
328    ) -> Result<(PlanRef, ColIndexMapping)> {
329        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
330        let (hop, out_col_change) = self.rewrite_with_input(input, input_col_change);
331        let (input, time_col, window_slide, window_size, window_offset, mut output_indices) =
332            hop.into_parts();
333        if !output_indices.contains(&input.schema().len())
334            && !output_indices.contains(&(input.schema().len() + 1))
335        // When both `window_start` and `window_end` are not in `output_indices`,
336        // we add `window_start` to ensure we can derive pk.
337        {
338            output_indices.push(input.schema().len());
339        }
340        let i2o = self.core.i2o_col_mapping();
341        output_indices.extend(
342            input
343                .expect_stream_key()
344                .iter()
345                .cloned()
346                .filter(|i| i2o.try_map(*i).is_none()),
347        );
348        let new_hop = Self::new(
349            input,
350            time_col,
351            window_slide,
352            window_size,
353            window_offset,
354            Some(output_indices),
355        );
356        Ok((new_hop.into(), out_col_change))
357    }
358}
359
360#[cfg(test)]
361mod test {
362    use std::collections::HashSet;
363
364    use risingwave_common::catalog::{Field, Schema};
365    use risingwave_common::types::DataType;
366
367    use super::*;
368    use crate::Explain;
369    use crate::optimizer::optimizer_context::OptimizerContext;
370    use crate::optimizer::plan_node::LogicalValues;
371    use crate::optimizer::property::FunctionalDependency;
372    #[tokio::test]
373    /// Pruning
374    /// ```text
375    /// HopWindow(time_col: $0 slide: 1 day size: 3 days)
376    ///   TableScan(date, v1, v2)
377    /// ```
378    /// with required columns [4, 2, 3] will result in
379    /// ```text
380    ///   HopWindow(time_col: $0 slide: 1 day size: 3 days output_indices: [3, 1, 2])
381    ///     TableScan(date, v3)
382    /// ```
383    async fn test_prune_hop_window_with_order_required() {
384        let ctx = OptimizerContext::mock().await;
385        let fields: Vec<Field> = vec![
386            Field::with_name(DataType::Date, "date"),
387            Field::with_name(DataType::Int32, "v1"),
388            Field::with_name(DataType::Int32, "v2"),
389        ];
390        let values = LogicalValues::new(
391            vec![],
392            Schema {
393                fields: fields.clone(),
394            },
395            ctx,
396        );
397        let hop_window: PlanRef = LogicalHopWindow::new(
398            values.into(),
399            InputRef::new(0, DataType::Date),
400            Interval::from_month_day_usec(0, 1, 0),
401            Interval::from_month_day_usec(0, 3, 0),
402            Interval::from_month_day_usec(0, 0, 0),
403            None,
404        )
405        .into();
406        // Perform the prune
407        let required_cols = vec![4, 2, 3];
408        let plan = hop_window.prune_col(
409            &required_cols,
410            &mut ColumnPruningContext::new(hop_window.clone()),
411        );
412        println!(
413            "{}\n{}",
414            hop_window.explain_to_string(),
415            plan.explain_to_string()
416        );
417        // Check the result
418        let hop_window = plan.as_logical_hop_window().unwrap();
419        assert_eq!(hop_window.core.output_indices, vec![3, 1, 2]);
420        assert_eq!(hop_window.schema().fields().len(), 3);
421
422        let values = hop_window.input();
423        let values = values.as_logical_values().unwrap();
424        assert_eq!(values.schema().fields().len(), 2);
425        assert_eq!(values.schema().fields()[0], fields[0]);
426        assert_eq!(values.schema().fields()[1], fields[2]);
427
428        let required_cols = (0..plan.schema().len()).collect_vec();
429        let plan2 = plan.prune_col(&required_cols, &mut ColumnPruningContext::new(plan.clone()));
430        assert_eq!(plan2.schema(), plan.schema());
431    }
432
433    #[tokio::test]
434    async fn fd_derivation_hop_window() {
435        // input: [date, v1, v2]
436        // FD: { date, v1 } --> { v2 }
437        // output: [date, v1, v2, window_start, window_end],
438        // FD: { date, v1 } --> { v2 }
439        //     window_start --> window_end
440        //     window_end --> window_start
441        let ctx = OptimizerContext::mock().await;
442        let fields: Vec<Field> = vec![
443            Field::with_name(DataType::Date, "date"),
444            Field::with_name(DataType::Int32, "v1"),
445            Field::with_name(DataType::Int32, "v2"),
446        ];
447        let mut values = LogicalValues::new(vec![], Schema { fields }, ctx);
448        // 0, 1 --> 2
449        values
450            .base
451            .functional_dependency_mut()
452            .add_functional_dependency_by_column_indices(&[0, 1], &[2]);
453        let hop_window: PlanRef = LogicalHopWindow::new(
454            values.into(),
455            InputRef::new(0, DataType::Date),
456            Interval::from_month_day_usec(0, 1, 0),
457            Interval::from_month_day_usec(0, 3, 0),
458            Interval::from_month_day_usec(0, 0, 0),
459            None,
460        )
461        .into();
462        let fd_set: HashSet<_> = hop_window
463            .functional_dependency()
464            .as_dependencies()
465            .iter()
466            .cloned()
467            .collect();
468        let expected_fd_set: HashSet<_> = [
469            FunctionalDependency::with_indices(5, &[0, 1], &[2]),
470            FunctionalDependency::with_indices(5, &[3], &[4]),
471            FunctionalDependency::with_indices(5, &[4], &[3]),
472        ]
473        .into_iter()
474        .collect();
475        assert_eq!(fd_set, expected_fd_set);
476    }
477}