risingwave_frontend/optimizer/plan_node/
logical_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_common::types::Interval;
18
19use super::generic::{GenericPlanNode, GenericPlanRef};
20use super::utils::impl_distill_by_unit;
21use super::{
22    BatchHopWindow, BatchPlanRef, ColPrunable, ExprRewritable, Logical, LogicalFilter,
23    LogicalPlanRef as PlanRef, PlanBase, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow,
24    StreamPlanRef, ToBatch, ToStream, gen_filter_and_pushdown, generic,
25};
26use crate::error::Result;
27use crate::expr::{ExprType, FunctionCall, InputRef};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{
30    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
31};
32use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
33
34/// `LogicalHopWindow` implements Hop Table Function.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct LogicalHopWindow {
37    pub base: PlanBase<Logical>,
38    core: generic::HopWindow<PlanRef>,
39}
40
41impl LogicalHopWindow {
42    /// Hop windows will add `windows_start` and `windows_end` columns at the end.
43    /// Take care to modify the code referring it if above rule changes.
44    pub const ADDITION_COLUMN_LEN: usize = 2;
45
46    /// just used in optimizer and the function will not check if the `time_col`'s value is NULL
47    /// compared with `LogicalHopWindow::create`
48    fn new(
49        input: PlanRef,
50        time_col: InputRef,
51        window_slide: Interval,
52        window_size: Interval,
53        window_offset: Interval,
54        output_indices: Option<Vec<usize>>,
55    ) -> Self {
56        // if output_indices is not specified, use default output_indices
57        let output_indices =
58            output_indices.unwrap_or_else(|| (0..input.schema().len() + 2).collect_vec());
59        let core = generic::HopWindow {
60            input,
61            time_col,
62            window_slide,
63            window_size,
64            window_offset,
65            output_indices,
66        };
67
68        let ctx = core.ctx();
69
70        let base = PlanBase::new_logical(
71            ctx,
72            core.schema(),
73            core.stream_key(),
74            core.functional_dependency(),
75        );
76
77        LogicalHopWindow { base, core }
78    }
79
80    pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
81        self.core.into_parts()
82    }
83
84    pub fn output_indices_are_trivial(&self) -> bool {
85        self.output_indices() == &(0..self.core.internal_column_num()).collect_vec()
86    }
87
88    /// used for binder and planner. The function will add a filter operator to ignore records with
89    /// NULL time value. <https://github.com/risingwavelabs/risingwave/issues/8130>
90    pub fn create(
91        input: PlanRef,
92        time_col: InputRef,
93        window_slide: Interval,
94        window_size: Interval,
95        window_offset: Interval,
96    ) -> PlanRef {
97        let input = LogicalFilter::create_with_expr(
98            input,
99            FunctionCall::new(ExprType::IsNotNull, vec![time_col.clone().into()])
100                .unwrap()
101                .into(),
102        );
103        Self::new(
104            input,
105            time_col,
106            window_slide,
107            window_size,
108            window_offset,
109            None,
110        )
111        .into()
112    }
113
114    pub fn output_window_start_col_idx(&self) -> Option<usize> {
115        self.core.output_window_start_col_idx()
116    }
117
118    pub fn output_window_end_col_idx(&self) -> Option<usize> {
119        self.core.output_window_end_col_idx()
120    }
121
122    pub fn o2i_col_mapping(&self) -> ColIndexMapping {
123        self.core.o2i_col_mapping()
124    }
125
126    pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
127        self.core.output2internal_col_mapping()
128    }
129
130    pub fn clone_with_output_indices(&self, output_indices: Vec<usize>) -> Self {
131        Self::new(
132            self.input(),
133            self.core.time_col.clone(),
134            self.core.window_slide,
135            self.core.window_size,
136            self.core.window_offset,
137            Some(output_indices),
138        )
139    }
140
141    /// Get output indices
142    pub fn output_indices(&self) -> &Vec<usize> {
143        &self.core.output_indices
144    }
145}
146
147impl PlanTreeNodeUnary<Logical> for LogicalHopWindow {
148    fn input(&self) -> PlanRef {
149        self.core.input.clone()
150    }
151
152    fn clone_with_input(&self, input: PlanRef) -> Self {
153        Self::new(
154            input,
155            self.core.time_col.clone(),
156            self.core.window_slide,
157            self.core.window_size,
158            self.core.window_offset,
159            Some(self.core.output_indices.clone()),
160        )
161    }
162
163    fn rewrite_with_input(
164        &self,
165        input: PlanRef,
166        input_col_change: ColIndexMapping,
167    ) -> (Self, ColIndexMapping) {
168        let mut time_col = self.core.time_col.clone();
169        time_col.index = input_col_change.map(time_col.index);
170        let mut columns_to_be_kept = Vec::new();
171        let new_output_indices = self
172            .core
173            .output_indices
174            .iter()
175            .enumerate()
176            .filter_map(|(i, &idx)| match input_col_change.try_map(idx) {
177                Some(new_idx) => {
178                    columns_to_be_kept.push(i);
179                    Some(new_idx)
180                }
181                None => {
182                    if idx == self.core.internal_window_start_col_idx() {
183                        columns_to_be_kept.push(i);
184                        Some(input.schema().len())
185                    } else if idx == self.core.internal_window_end_col_idx() {
186                        columns_to_be_kept.push(i);
187                        Some(input.schema().len() + 1)
188                    } else {
189                        None
190                    }
191                }
192            })
193            .collect_vec();
194        let new_hop = Self::new(
195            input,
196            time_col,
197            self.core.window_slide,
198            self.core.window_size,
199            self.core.window_offset,
200            Some(new_output_indices),
201        );
202        (
203            new_hop,
204            ColIndexMapping::with_remaining_columns(
205                &columns_to_be_kept,
206                self.core.output_indices.len(),
207            ),
208        )
209    }
210}
211
212impl_plan_tree_node_for_unary! { Logical, LogicalHopWindow}
213impl_distill_by_unit!(LogicalHopWindow, core, "LogicalHopWindow");
214
215impl ColPrunable for LogicalHopWindow {
216    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
217        let o2i = self.o2i_col_mapping();
218        let input_required_cols = {
219            let mut tmp = FixedBitSet::with_capacity(self.schema().len());
220            tmp.extend(required_cols.iter().copied());
221            tmp = o2i.rewrite_bitset(&tmp);
222            // LogicalHopWindow should keep all required cols from upstream,
223            // as well as its own time_col.
224            tmp.put(self.core.time_col.index());
225            tmp.ones().collect_vec()
226        };
227        let input = self.input().prune_col(&input_required_cols, ctx);
228        let input_change = ColIndexMapping::with_remaining_columns(
229            &input_required_cols,
230            self.input().schema().len(),
231        );
232        let (new_hop, _) = self.rewrite_with_input(input, input_change.clone());
233        let output_cols = {
234            // output cols = { cols required by upstream from input node } ∪ { additional window
235            // cols }
236            #[derive(Copy, Clone, Debug)]
237            enum IndexType {
238                Input(usize),
239                WindowStart,
240                WindowEnd,
241            }
242            let output2internal = self.output2internal_col_mapping();
243            // map the indices from output to input
244            let input_required_cols = required_cols
245                .iter()
246                .filter_map(|&idx| {
247                    if let Some(idx) = o2i.try_map(idx) {
248                        Some(IndexType::Input(idx))
249                    } else if let Some(idx) = output2internal.try_map(idx) {
250                        if idx == self.core.internal_window_start_col_idx() {
251                            Some(IndexType::WindowStart)
252                        } else if idx == self.core.internal_window_end_col_idx() {
253                            Some(IndexType::WindowEnd)
254                        } else {
255                            None
256                        }
257                    } else {
258                        None
259                    }
260                })
261                .collect_vec();
262            // this mapping will only keeps required columns
263            input_required_cols
264                .iter()
265                .filter_map(|&idx| match idx {
266                    IndexType::Input(x) => input_change.try_map(x),
267                    IndexType::WindowStart => Some(new_hop.core.internal_window_start_col_idx()),
268                    IndexType::WindowEnd => Some(new_hop.core.internal_window_end_col_idx()),
269                })
270                .collect_vec()
271        };
272        new_hop.clone_with_output_indices(output_cols).into()
273    }
274}
275
276impl ExprRewritable<Logical> for LogicalHopWindow {}
277
278impl ExprVisitable for LogicalHopWindow {}
279
280impl PredicatePushdown for LogicalHopWindow {
281    /// Keep predicate on time window parameters (`window_start`, `window_end`),
282    /// the rest may be pushed-down.
283    fn predicate_pushdown(
284        &self,
285        predicate: Condition,
286        ctx: &mut PredicatePushdownContext,
287    ) -> PlanRef {
288        let mut window_columns = FixedBitSet::with_capacity(self.schema().len());
289
290        let window_start_idx = self.core.internal_window_start_col_idx();
291        let window_end_idx = self.core.internal_window_end_col_idx();
292        for (i, v) in self.output_indices().iter().enumerate() {
293            if *v == window_start_idx || *v == window_end_idx {
294                window_columns.insert(i);
295            }
296        }
297        let (time_window_pred, pushed_predicate) = predicate.split_disjoint(&window_columns);
298        let mut mapping = self.o2i_col_mapping();
299        let pushed_predicate = pushed_predicate.rewrite_expr(&mut mapping);
300        gen_filter_and_pushdown(self, time_window_pred, pushed_predicate, ctx)
301    }
302}
303
304impl ToBatch for LogicalHopWindow {
305    fn to_batch(&self) -> Result<BatchPlanRef> {
306        let new_input = self.input().to_batch()?;
307        let window = self.core.clone_with_input(new_input);
308        let (window_start_exprs, window_end_exprs) = window.derive_window_start_and_end_exprs()?;
309        Ok(BatchHopWindow::new(window, window_start_exprs, window_end_exprs).into())
310    }
311}
312
313impl ToStream for LogicalHopWindow {
314    fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
315        let new_input = self.input().to_stream(ctx)?;
316        let window = self.core.clone_with_input(new_input);
317        let (window_start_exprs, window_end_exprs) = window.derive_window_start_and_end_exprs()?;
318        Ok(StreamHopWindow::new(window, window_start_exprs, window_end_exprs).into())
319    }
320
321    fn logical_rewrite_for_stream(
322        &self,
323        ctx: &mut RewriteStreamContext,
324    ) -> Result<(PlanRef, ColIndexMapping)> {
325        let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
326        let (hop, out_col_change) = self.rewrite_with_input(input, input_col_change);
327        let (input, time_col, window_slide, window_size, window_offset, mut output_indices) =
328            hop.into_parts();
329        if !output_indices.contains(&input.schema().len())
330            && !output_indices.contains(&(input.schema().len() + 1))
331        // When both `window_start` and `window_end` are not in `output_indices`,
332        // we add `window_start` to ensure we can derive pk.
333        {
334            output_indices.push(input.schema().len());
335        }
336        let i2o = self.core.i2o_col_mapping();
337        output_indices.extend(
338            input
339                .expect_stream_key()
340                .iter()
341                .cloned()
342                .filter(|i| i2o.try_map(*i).is_none()),
343        );
344        let new_hop = Self::new(
345            input,
346            time_col,
347            window_slide,
348            window_size,
349            window_offset,
350            Some(output_indices),
351        );
352        Ok((new_hop.into(), out_col_change))
353    }
354}
355
356#[cfg(test)]
357mod test {
358    use std::collections::HashSet;
359
360    use risingwave_common::catalog::{Field, Schema};
361    use risingwave_common::types::DataType;
362
363    use super::*;
364    use crate::Explain;
365    use crate::optimizer::optimizer_context::OptimizerContext;
366    use crate::optimizer::plan_node::LogicalValues;
367    use crate::optimizer::property::FunctionalDependency;
368    #[tokio::test]
369    /// Pruning
370    /// ```text
371    /// HopWindow(time_col: $0 slide: 1 day size: 3 days)
372    ///   TableScan(date, v1, v2)
373    /// ```
374    /// with required columns [4, 2, 3] will result in
375    /// ```text
376    ///   HopWindow(time_col: $0 slide: 1 day size: 3 days output_indices: [3, 1, 2])
377    ///     TableScan(date, v3)
378    /// ```
379    async fn test_prune_hop_window_with_order_required() {
380        let ctx = OptimizerContext::mock().await;
381        let fields: Vec<Field> = vec![
382            Field::with_name(DataType::Date, "date"),
383            Field::with_name(DataType::Int32, "v1"),
384            Field::with_name(DataType::Int32, "v2"),
385        ];
386        let values = LogicalValues::new(
387            vec![],
388            Schema {
389                fields: fields.clone(),
390            },
391            ctx,
392        );
393        let hop_window: PlanRef = LogicalHopWindow::new(
394            values.into(),
395            InputRef::new(0, DataType::Date),
396            Interval::from_month_day_usec(0, 1, 0),
397            Interval::from_month_day_usec(0, 3, 0),
398            Interval::from_month_day_usec(0, 0, 0),
399            None,
400        )
401        .into();
402        // Perform the prune
403        let required_cols = vec![4, 2, 3];
404        let plan = hop_window.prune_col(
405            &required_cols,
406            &mut ColumnPruningContext::new(hop_window.clone()),
407        );
408        println!(
409            "{}\n{}",
410            hop_window.explain_to_string(),
411            plan.explain_to_string()
412        );
413        // Check the result
414        let hop_window = plan.as_logical_hop_window().unwrap();
415        assert_eq!(hop_window.core.output_indices, vec![3, 1, 2]);
416        assert_eq!(hop_window.schema().fields().len(), 3);
417
418        let values = hop_window.input();
419        let values = values.as_logical_values().unwrap();
420        assert_eq!(values.schema().fields().len(), 2);
421        assert_eq!(values.schema().fields()[0], fields[0]);
422        assert_eq!(values.schema().fields()[1], fields[2]);
423
424        let required_cols = (0..plan.schema().len()).collect_vec();
425        let plan2 = plan.prune_col(&required_cols, &mut ColumnPruningContext::new(plan.clone()));
426        assert_eq!(plan2.schema(), plan.schema());
427    }
428
429    #[tokio::test]
430    async fn fd_derivation_hop_window() {
431        // input: [date, v1, v2]
432        // FD: { date, v1 } --> { v2 }
433        // output: [date, v1, v2, window_start, window_end],
434        // FD: { date, v1 } --> { v2 }
435        //     window_start --> window_end
436        //     window_end --> window_start
437        let ctx = OptimizerContext::mock().await;
438        let fields: Vec<Field> = vec![
439            Field::with_name(DataType::Date, "date"),
440            Field::with_name(DataType::Int32, "v1"),
441            Field::with_name(DataType::Int32, "v2"),
442        ];
443        let mut values = LogicalValues::new(vec![], Schema { fields }, ctx);
444        // 0, 1 --> 2
445        values
446            .base
447            .functional_dependency_mut()
448            .add_functional_dependency_by_column_indices(&[0, 1], &[2]);
449        let hop_window: PlanRef = LogicalHopWindow::new(
450            values.into(),
451            InputRef::new(0, DataType::Date),
452            Interval::from_month_day_usec(0, 1, 0),
453            Interval::from_month_day_usec(0, 3, 0),
454            Interval::from_month_day_usec(0, 0, 0),
455            None,
456        )
457        .into();
458        let fd_set: HashSet<_> = hop_window
459            .functional_dependency()
460            .as_dependencies()
461            .iter()
462            .cloned()
463            .collect();
464        let expected_fd_set: HashSet<_> = [
465            FunctionalDependency::with_indices(5, &[0, 1], &[2]),
466            FunctionalDependency::with_indices(5, &[3], &[4]),
467            FunctionalDependency::with_indices(5, &[4], &[3]),
468        ]
469        .into_iter()
470        .collect();
471        assert_eq!(fd_set, expected_fd_set);
472    }
473}