risingwave_frontend/optimizer/plan_node/
stream_hop_window.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::HopWindowNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::stream::prelude::*;
20use super::utils::{Distill, childless_record, watermark_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb, generic,
23};
24use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::MonotonicityMap;
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::ColIndexMappingRewriteExt;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamHopWindow {
34 pub base: PlanBase<Stream>,
35 core: generic::HopWindow<PlanRef>,
36 window_start_exprs: Vec<ExprImpl>,
37 window_end_exprs: Vec<ExprImpl>,
38}
39
40impl StreamHopWindow {
41 pub fn new(
42 core: generic::HopWindow<PlanRef>,
43 window_start_exprs: Vec<ExprImpl>,
44 window_end_exprs: Vec<ExprImpl>,
45 ) -> Self {
46 let input = core.input.clone();
47 let dist = core
48 .i2o_col_mapping()
49 .rewrite_provided_distribution(input.distribution());
50
51 let input2internal = core.input2internal_col_mapping();
52 let internal2output = core.internal2output_col_mapping();
53
54 let mut internal_watermark_columns = input.watermark_columns().map_clone(&input2internal);
55 if let Some(wtmk_group) = input.watermark_columns().get_group(core.time_col.index) {
56 internal_watermark_columns.insert(core.internal_window_start_col_idx(), wtmk_group);
58 internal_watermark_columns.insert(core.internal_window_end_col_idx(), wtmk_group);
59 }
60
61 let base = PlanBase::new_stream_with_core(
62 &core,
63 dist,
64 input.stream_kind(),
65 input.emit_on_window_close(),
66 internal_watermark_columns.map_clone(&internal2output),
67 MonotonicityMap::new(), );
69 Self {
70 base,
71 core,
72 window_start_exprs,
73 window_end_exprs,
74 }
75 }
76}
77
78impl Distill for StreamHopWindow {
79 fn distill<'a>(&self) -> XmlNode<'a> {
80 let mut vec = self.core.fields_pretty();
81 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
82 vec.push(("output_watermarks", ow));
83 }
84 childless_record("StreamHopWindow", vec)
85 }
86}
87
88impl PlanTreeNodeUnary<Stream> for StreamHopWindow {
89 fn input(&self) -> PlanRef {
90 self.core.input.clone()
91 }
92
93 fn clone_with_input(&self, input: PlanRef) -> Self {
94 let mut core = self.core.clone();
95 core.input = input;
96 Self::new(
97 core,
98 self.window_start_exprs.clone(),
99 self.window_end_exprs.clone(),
100 )
101 }
102}
103
104impl_plan_tree_node_for_unary! { Stream, StreamHopWindow}
105
106impl TryToStreamPb for StreamHopWindow {
107 fn try_to_stream_prost_body(
108 &self,
109 _state: &mut BuildFragmentGraphState,
110 ) -> SchedulerResult<PbNodeBody> {
111 let retract = self.input().stream_kind().is_retract();
112 let window_start_exprs = self
113 .window_start_exprs
114 .clone()
115 .iter()
116 .map(|expr| expr.to_expr_proto_checked_pure(retract, "HOP window start"))
117 .collect::<crate::error::Result<Vec<_>>>()?;
118 let window_end_exprs = self
119 .window_end_exprs
120 .clone()
121 .iter()
122 .map(|expr| expr.to_expr_proto_checked_pure(retract, "HOP window end"))
123 .collect::<crate::error::Result<Vec<_>>>()?;
124 Ok(PbNodeBody::HopWindow(Box::new(HopWindowNode {
125 time_col: self.core.time_col.index() as _,
126 window_slide: Some(self.core.window_slide.into()),
127 window_size: Some(self.core.window_size.into()),
128 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
129 window_start_exprs,
130 window_end_exprs,
131 })))
132 }
133}
134
135impl ExprRewritable<Stream> for StreamHopWindow {
136 fn has_rewritable_expr(&self) -> bool {
137 true
138 }
139
140 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
141 Self::new(
142 self.core.clone(),
143 self.window_start_exprs
144 .clone()
145 .into_iter()
146 .map(|e| r.rewrite_expr(e))
147 .collect(),
148 self.window_end_exprs
149 .clone()
150 .into_iter()
151 .map(|e| r.rewrite_expr(e))
152 .collect(),
153 )
154 .into()
155 }
156}
157
158impl ExprVisitable for StreamHopWindow {
159 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
160 self.window_start_exprs.iter().for_each(|e| v.visit_expr(e));
161 self.window_end_exprs.iter().for_each(|e| v.visit_expr(e));
162 }
163}