risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::MonotonicityMap;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::ColIndexMappingRewriteExt;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamHopWindow {
33 pub base: PlanBase<Stream>,
34 core: generic::HopWindow<PlanRef>,
35 window_start_exprs: Vec<ExprImpl>,
36 window_end_exprs: Vec<ExprImpl>,
37}
38
39impl StreamHopWindow {
40 pub fn new(
41 core: generic::HopWindow<PlanRef>,
42 window_start_exprs: Vec<ExprImpl>,
43 window_end_exprs: Vec<ExprImpl>,
44 ) -> Self {
45 let input = core.input.clone();
46 let dist = core
47 .i2o_col_mapping()
48 .rewrite_provided_distribution(input.distribution());
49
50 let input2internal = core.input2internal_col_mapping();
51 let internal2output = core.internal2output_col_mapping();
52
53 let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
54 if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
55 internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
57 internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
58 }
59
60 let base = PlanBase::new_stream_with_core(
61 &core,
62 dist,
63 input.stream_kind(),
64 input.emit_on_window_close(),
65 internal_watermark_columns.map_clone(&internal2output),
66 MonotonicityMap::new(), );
68 Self {
69 base,
70 core,
71 window_start_exprs,
72 window_end_exprs,
73 }
74 }
75}
76
77impl Distill for StreamHopWindow {
78 fn distill<'a>(&self) -> XmlNode<'a> {
79 let mut vec = self.core.fields_pretty();
80 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
81 vec.push(("output_watermarks", ow));
82 }
83 childless_record("StreamHopWindow", vec)
84 }
85}
86
87impl PlanTreeNodeUnary<Stream> for StreamHopWindow {
88 fn input(&self) -> PlanRef {
89 self.core.input.clone()
90 }
91
92 fn clone_with_input(&self, input: PlanRef) -> Self {
93 let mut core = self.core.clone();
94 core.input = input;
95 Self::new(
96 core,
97 self.window_start_exprs.clone(),
98 self.window_end_exprs.clone(),
99 )
100 }
101}
102
103impl_plan_tree_node_for_unary! { Stream, StreamHopWindow}
104
105impl StreamNode for StreamHopWindow {
106 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
107 PbNodeBody::HopWindow(Box::new(HopWindowNode {
108 time_col: self.core.time_col.index() as _,
109 window_slide: Some(self.core.window_slide.into()),
110 window_size: Some(self.core.window_size.into()),
111 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
112 window_start_exprs: self
113 .window_start_exprs
114 .clone()
115 .iter()
116 .map(|x| x.to_expr_proto())
117 .collect(),
118 window_end_exprs: self
119 .window_end_exprs
120 .clone()
121 .iter()
122 .map(|x| x.to_expr_proto())
123 .collect(),
124 }))
125 }
126}
127
128impl ExprRewritable<Stream> for StreamHopWindow {
129 fn has_rewritable_expr(&self) -> bool {
130 true
131 }
132
133 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
134 Self::new(
135 self.core.clone(),
136 self.window_start_exprs
137 .clone()
138 .into_iter()
139 .map(|e| r.rewrite_expr(e))
140 .collect(),
141 self.window_end_exprs
142 .clone()
143 .into_iter()
144 .map(|e| r.rewrite_expr(e))
145 .collect(),
146 )
147 .into()
148 }
149}
150
151impl ExprVisitable for StreamHopWindow {
152 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
153 self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
154 self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
155 }
156}