risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::MonotonicityMap;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::ColIndexMappingRewriteExt;
29
30/// [`StreamHopWindow`] represents a hop window table function.
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamHopWindow {
33    pub base: PlanBase<Stream>,
34    core: generic::HopWindow<PlanRef>,
35    window_start_exprs: Vec<ExprImpl>,
36    window_end_exprs: Vec<ExprImpl>,
37}
38
39impl StreamHopWindow {
40    pub fn new(
41        core: generic::HopWindow<PlanRef>,
42        window_start_exprs: Vec<ExprImpl>,
43        window_end_exprs: Vec<ExprImpl>,
44    ) -> Self {
45        let input = core.input.clone();
46        let dist = core
47            .i2o_col_mapping()
48            .rewrite_provided_distribution(input.distribution());
49
50        let input2internal = core.input2internal_col_mapping();
51        let internal2output = core.internal2output_col_mapping();
52
53        let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
54        if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
55            // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`.
56            internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
57            internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
58        }
59
60        let base = PlanBase::new_stream_with_core(
61            &core,
62            dist,
63            input.stream_kind(),
64            input.emit_on_window_close(),
65            internal_watermark_columns.map_clone(&internal2output),
66            MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */
67        );
68        Self {
69            base,
70            core,
71            window_start_exprs,
72            window_end_exprs,
73        }
74    }
75}
76
77impl Distill for StreamHopWindow {
78    fn distill<'a>(&self) -> XmlNode<'a> {
79        let mut vec = self.core.fields_pretty();
80        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
81            vec.push(("output_watermarks", ow));
82        }
83        childless_record("StreamHopWindow", vec)
84    }
85}
86
87impl PlanTreeNodeUnary<Stream> for StreamHopWindow {
88    fn input(&self) -> PlanRef {
89        self.core.input.clone()
90    }
91
92    fn clone_with_input(&self, input: PlanRef) -> Self {
93        let mut core = self.core.clone();
94        core.input = input;
95        Self::new(
96            core,
97            self.window_start_exprs.clone(),
98            self.window_end_exprs.clone(),
99        )
100    }
101}
102
103impl_plan_tree_node_for_unary! { Stream, StreamHopWindow}
104
105impl StreamNode for StreamHopWindow {
106    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
107        PbNodeBody::HopWindow(Box::new(HopWindowNode {
108            time_col: self.core.time_col.index() as _,
109            window_slide: Some(self.core.window_slide.into()),
110            window_size: Some(self.core.window_size.into()),
111            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
112            window_start_exprs: self
113                .window_start_exprs
114                .clone()
115                .iter()
116                .map(|x| x.to_expr_proto())
117                .collect(),
118            window_end_exprs: self
119                .window_end_exprs
120                .clone()
121                .iter()
122                .map(|x| x.to_expr_proto())
123                .collect(),
124        }))
125    }
126}
127
128impl ExprRewritable<Stream> for StreamHopWindow {
129    fn has_rewritable_expr(&self) -> bool {
130        true
131    }
132
133    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
134        Self::new(
135            self.core.clone(),
136            self.window_start_exprs
137                .clone()
138                .into_iter()
139                .map(|e| r.rewrite_expr(e))
140                .collect(),
141            self.window_end_exprs
142                .clone()
143                .into_iter()
144                .map(|e| r.rewrite_expr(e))
145                .collect(),
146        )
147        .into()
148    }
149}
150
151impl ExprVisitable for StreamHopWindow {
152    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
153        self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
154        self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
155    }
156}