risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::MonotonicityMap;
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::ColIndexMappingRewriteExt;
30
31/// [`StreamHopWindow`] represents a hop window table function.
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamHopWindow {
34    pub base: PlanBase<Stream>,
35    core: generic::HopWindow<PlanRef>,
36    window_start_exprs: Vec<ExprImpl>,
37    window_end_exprs: Vec<ExprImpl>,
38}
39
40impl StreamHopWindow {
41    pub fn new(
42        core: generic::HopWindow<PlanRef>,
43        window_start_exprs: Vec<ExprImpl>,
44        window_end_exprs: Vec<ExprImpl>,
45    ) -> Self {
46        let input = core.input.clone();
47        let dist = core
48            .i2o_col_mapping()
49            .rewrite_provided_distribution(input.distribution());
50
51        let input2internal = core.input2internal_col_mapping();
52        let internal2output = core.internal2output_col_mapping();
53
54        let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
55        if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
56            // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`.
57            internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
58            internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
59        }
60
61        let base = PlanBase::new_stream_with_core(
62            &core,
63            dist,
64            input.stream_kind(),
65            input.emit_on_window_close(),
66            internal_watermark_columns.map_clone(&internal2output),
67            MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */
68        );
69        Self {
70            base,
71            core,
72            window_start_exprs,
73            window_end_exprs,
74        }
75    }
76}
77
78impl Distill for StreamHopWindow {
79    fn distill<'a>(&self) -> XmlNode<'a> {
80        let mut vec = self.core.fields_pretty();
81        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
82            vec.push(("output_watermarks", ow));
83        }
84        childless_record("StreamHopWindow", vec)
85    }
86}
87
88impl PlanTreeNodeUnary<Stream> for StreamHopWindow {
89    fn input(&self) -> PlanRef {
90        self.core.input.clone()
91    }
92
93    fn clone_with_input(&self, input: PlanRef) -> Self {
94        let mut core = self.core.clone();
95        core.input = input;
96        Self::new(
97            core,
98            self.window_start_exprs.clone(),
99            self.window_end_exprs.clone(),
100        )
101    }
102}
103
104impl_plan_tree_node_for_unary! { Stream, StreamHopWindow}
105
106impl TryToStreamPb for StreamHopWindow {
107    fn try_to_stream_prost_body(
108        &self,
109        _state: &mut BuildFragmentGraphState,
110    ) -> SchedulerResult<PbNodeBody> {
111        let retract = self.input().stream_kind().is_retract();
112        let window_start_exprs = self
113            .window_start_exprs
114            .clone()
115            .iter()
116            .map(|expr| expr.to_expr_proto_checked_pure(retract, "HOP window start"))
117            .collect::<crate::error::Result<Vec<_>>>()?;
118        let window_end_exprs = self
119            .window_end_exprs
120            .clone()
121            .iter()
122            .map(|expr| expr.to_expr_proto_checked_pure(retract, "HOP window end"))
123            .collect::<crate::error::Result<Vec<_>>>()?;
124        Ok(PbNodeBody::HopWindow(Box::new(HopWindowNode {
125            time_col: self.core.time_col.index() as _,
126            window_slide: Some(self.core.window_slide.into()),
127            window_size: Some(self.core.window_size.into()),
128            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
129            window_start_exprs,
130            window_end_exprs,
131        })))
132    }
133}
134
135impl ExprRewritable<Stream> for StreamHopWindow {
136    fn has_rewritable_expr(&self) -> bool {
137        true
138    }
139
140    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
141        Self::new(
142            self.core.clone(),
143            self.window_start_exprs
144                .clone()
145                .into_iter()
146                .map(|e| r.rewrite_expr(e))
147                .collect(),
148            self.window_end_exprs
149                .clone()
150                .into_iter()
151                .map(|e| r.rewrite_expr(e))
152                .collect(),
153        )
154        .into()
155    }
156}
157
158impl ExprVisitable for StreamHopWindow {
159    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
160        self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
161        self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
162    }
163}