risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::MonotonicityMap;
25use crate::stream_fragmenter::BuildFragmentGraphState;
26use crate::utils::ColIndexMappingRewriteExt;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamHopWindow {
31 pub base: PlanBase<Stream>,
32 core: generic::HopWindow<PlanRef>,
33 window_start_exprs: Vec<ExprImpl>,
34 window_end_exprs: Vec<ExprImpl>,
35}
36
37impl StreamHopWindow {
38 pub fn new(
39 core: generic::HopWindow<PlanRef>,
40 window_start_exprs: Vec<ExprImpl>,
41 window_end_exprs: Vec<ExprImpl>,
42 ) -> Self {
43 let input = core.input.clone();
44 let dist = core
45 .i2o_col_mapping()
46 .rewrite_provided_distribution(input.distribution());
47
48 let input2internal = core.input2internal_col_mapping();
49 let internal2output = core.internal2output_col_mapping();
50
51 let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
52 if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
53 internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
55 internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
56 }
57
58 let base = PlanBase::new_stream_with_core(
59 &core,
60 dist,
61 input.append_only(),
62 input.emit_on_window_close(),
63 internal_watermark_columns.map_clone(&internal2output),
64 MonotonicityMap::new(), );
66 Self {
67 base,
68 core,
69 window_start_exprs,
70 window_end_exprs,
71 }
72 }
73}
74
75impl Distill for StreamHopWindow {
76 fn distill<'a>(&self) -> XmlNode<'a> {
77 let mut vec = self.core.fields_pretty();
78 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
79 vec.push(("output_watermarks", ow));
80 }
81 childless_record("StreamHopWindow", vec)
82 }
83}
84
85impl PlanTreeNodeUnary for StreamHopWindow {
86 fn input(&self) -> PlanRef {
87 self.core.input.clone()
88 }
89
90 fn clone_with_input(&self, input: PlanRef) -> Self {
91 let mut core = self.core.clone();
92 core.input = input;
93 Self::new(
94 core,
95 self.window_start_exprs.clone(),
96 self.window_end_exprs.clone(),
97 )
98 }
99}
100
101impl_plan_tree_node_for_unary! {StreamHopWindow}
102
103impl StreamNode for StreamHopWindow {
104 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
105 PbNodeBody::HopWindow(Box::new(HopWindowNode {
106 time_col: self.core.time_col.index() as _,
107 window_slide: Some(self.core.window_slide.into()),
108 window_size: Some(self.core.window_size.into()),
109 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
110 window_start_exprs: self
111 .window_start_exprs
112 .clone()
113 .iter()
114 .map(|x| x.to_expr_proto())
115 .collect(),
116 window_end_exprs: self
117 .window_end_exprs
118 .clone()
119 .iter()
120 .map(|x| x.to_expr_proto())
121 .collect(),
122 }))
123 }
124}
125
126impl ExprRewritable for StreamHopWindow {
127 fn has_rewritable_expr(&self) -> bool {
128 true
129 }
130
131 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
132 Self::new(
133 self.core.clone(),
134 self.window_start_exprs
135 .clone()
136 .into_iter()
137 .map(|e| r.rewrite_expr(e))
138 .collect(),
139 self.window_end_exprs
140 .clone()
141 .into_iter()
142 .map(|e| r.rewrite_expr(e))
143 .collect(),
144 )
145 .into()
146 }
147}
148
149impl ExprVisitable for StreamHopWindow {
150 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
151 self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
152 self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
153 }
154}