risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::MonotonicityMap;
25use crate::stream_fragmenter::BuildFragmentGraphState;
26use crate::utils::ColIndexMappingRewriteExt;
27
28/// [`StreamHopWindow`] represents a hop window table function.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamHopWindow {
31    pub base: PlanBase<Stream>,
32    core: generic::HopWindow<PlanRef>,
33    window_start_exprs: Vec<ExprImpl>,
34    window_end_exprs: Vec<ExprImpl>,
35}
36
37impl StreamHopWindow {
38    pub fn new(
39        core: generic::HopWindow<PlanRef>,
40        window_start_exprs: Vec<ExprImpl>,
41        window_end_exprs: Vec<ExprImpl>,
42    ) -> Self {
43        let input = core.input.clone();
44        let dist = core
45            .i2o_col_mapping()
46            .rewrite_provided_distribution(input.distribution());
47
48        let input2internal = core.input2internal_col_mapping();
49        let internal2output = core.internal2output_col_mapping();
50
51        let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
52        if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
53            // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`.
54            internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
55            internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
56        }
57
58        let base = PlanBase::new_stream_with_core(
59            &core,
60            dist,
61            input.append_only(),
62            input.emit_on_window_close(),
63            internal_watermark_columns.map_clone(&internal2output),
64            MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */
65        );
66        Self {
67            base,
68            core,
69            window_start_exprs,
70            window_end_exprs,
71        }
72    }
73}
74
75impl Distill for StreamHopWindow {
76    fn distill<'a>(&self) -> XmlNode<'a> {
77        let mut vec = self.core.fields_pretty();
78        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
79            vec.push(("output_watermarks", ow));
80        }
81        childless_record("StreamHopWindow", vec)
82    }
83}
84
85impl PlanTreeNodeUnary for StreamHopWindow {
86    fn input(&self) -> PlanRef {
87        self.core.input.clone()
88    }
89
90    fn clone_with_input(&self, input: PlanRef) -> Self {
91        let mut core = self.core.clone();
92        core.input = input;
93        Self::new(
94            core,
95            self.window_start_exprs.clone(),
96            self.window_end_exprs.clone(),
97        )
98    }
99}
100
101impl_plan_tree_node_for_unary! {StreamHopWindow}
102
103impl StreamNode for StreamHopWindow {
104    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
105        PbNodeBody::HopWindow(Box::new(HopWindowNode {
106            time_col: self.core.time_col.index() as _,
107            window_slide: Some(self.core.window_slide.into()),
108            window_size: Some(self.core.window_size.into()),
109            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
110            window_start_exprs: self
111                .window_start_exprs
112                .clone()
113                .iter()
114                .map(|x| x.to_expr_proto())
115                .collect(),
116            window_end_exprs: self
117                .window_end_exprs
118                .clone()
119                .iter()
120                .map(|x| x.to_expr_proto())
121                .collect(),
122        }))
123    }
124}
125
126impl ExprRewritable for StreamHopWindow {
127    fn has_rewritable_expr(&self) -> bool {
128        true
129    }
130
131    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
132        Self::new(
133            self.core.clone(),
134            self.window_start_exprs
135                .clone()
136                .into_iter()
137                .map(|e| r.rewrite_expr(e))
138                .collect(),
139            self.window_end_exprs
140                .clone()
141                .into_iter()
142                .map(|e| r.rewrite_expr(e))
143                .collect(),
144        )
145        .into()
146    }
147}
148
149impl ExprVisitable for StreamHopWindow {
150    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
151        self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
152        self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
153    }
154}