risingwave_frontend/optimizer/plan_node/
stream_eowc_over_window.rs1use std::collections::HashSet;
16
17use risingwave_common::catalog::Field;
18use risingwave_common::types::DataType;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_expr::window_function::WindowFuncKind;
21use risingwave_pb::stream_plan::stream_node::PbNodeBody;
22
23use super::generic::{self, PlanWindowFunction};
24use super::stream::prelude::*;
25use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
26use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef};
27use crate::TableCatalog;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::property::{MonotonicityMap, StreamKind, WatermarkColumns};
30use crate::stream_fragmenter::BuildFragmentGraphState;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct StreamEowcOverWindow {
34 pub base: PlanBase<Stream>,
35 core: generic::OverWindow<PlanRef>,
36}
37
38impl StreamEowcOverWindow {
39 pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
40 assert!(core.funcs_have_same_partition_and_order());
41
42 let input = &core.input;
43 assert!(input.append_only());
44 assert!(input.emit_on_window_close());
45
46 let order_key = &core.window_functions[0].order_by;
48 assert_eq!(order_key.len(), 1);
49 assert_eq!(order_key[0].order_type, OrderType::ascending());
50 let order_key_idx = order_key[0].column_index;
51 let input_watermark_cols = core.input.watermark_columns();
52 assert!(input_watermark_cols.contains(order_key_idx));
53
54 let watermark_columns = WatermarkColumns::new();
57
58 let base = PlanBase::new_stream_with_core(
59 &core,
60 input.distribution().clone(),
61 StreamKind::AppendOnly,
62 true,
63 watermark_columns,
64 MonotonicityMap::new(),
66 );
67 StreamEowcOverWindow { base, core }
68 }
69
70 fn window_functions(&self) -> &[PlanWindowFunction] {
71 &self.core.window_functions
72 }
73
74 fn partition_key_indices(&self) -> Vec<usize> {
75 self.window_functions()[0]
76 .partition_by
77 .iter()
78 .map(|i| i.index())
79 .collect()
80 }
81
82 fn order_key_index(&self) -> usize {
83 self.window_functions()[0].order_by[0].column_index
84 }
85
86 fn infer_state_table(&self) -> TableCatalog {
87 let in_fields = self.core.input.schema().fields();
90 let mut tbl_builder = TableCatalogBuilder::default();
91 for field in in_fields {
92 tbl_builder.add_column(field);
93 }
94
95 let mut order_cols = HashSet::new();
96 let partition_key_indices = self.partition_key_indices();
97 for idx in &partition_key_indices {
98 if !order_cols.contains(idx) {
99 tbl_builder.add_order_column(*idx, OrderType::ascending());
100 order_cols.insert(*idx);
101 }
102 }
103 let read_prefix_len_hint = tbl_builder.get_current_pk_len();
104 let order_key_index = self.order_key_index();
105 if !order_cols.contains(&order_key_index) {
106 tbl_builder.add_order_column(order_key_index, OrderType::ascending());
107 order_cols.insert(order_key_index);
108 }
109 for idx in self.core.input.expect_stream_key() {
110 if !order_cols.contains(idx) {
111 tbl_builder.add_order_column(*idx, OrderType::ascending());
112 order_cols.insert(*idx);
113 }
114 }
115
116 let in_dist_key = self.core.input.distribution().dist_column_indices();
117 tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint)
118 }
119
120 fn needs_intermediate_state_table(&self) -> bool {
123 self.window_functions().iter().any(|f| {
124 matches!(
125 f.kind,
126 WindowFuncKind::RowNumber | WindowFuncKind::Rank | WindowFuncKind::DenseRank
127 )
128 })
129 }
130
131 fn infer_intermediate_state_table(&self) -> TableCatalog {
136 let in_fields = self.core.input.schema().fields();
137 let mut tbl_builder = TableCatalogBuilder::default();
138
139 let partition_key_indices = self.partition_key_indices();
141 for &idx in &partition_key_indices {
142 tbl_builder.add_column(&in_fields[idx]);
143 }
144
145 let num_calls = self.window_functions().len();
147 for i in 0..num_calls {
148 tbl_builder.add_column(&Field::with_name(DataType::Bytea, format!("state_{}", i)));
149 }
150
151 for i in 0..partition_key_indices.len() {
153 tbl_builder.add_order_column(i, OrderType::ascending());
154 }
155 let read_prefix_len_hint = tbl_builder.get_current_pk_len();
156
157 let in_dist_key = self.core.input.distribution().dist_column_indices();
159 let dist_key: Vec<usize> = in_dist_key
162 .iter()
163 .filter_map(|&idx| {
164 partition_key_indices
165 .iter()
166 .position(|&pk_idx| pk_idx == idx)
167 })
168 .collect();
169
170 tbl_builder.build(dist_key, read_prefix_len_hint)
171 }
172}
173
174impl_distill_by_unit!(StreamEowcOverWindow, core, "StreamEowcOverWindow");
175
176impl PlanTreeNodeUnary<Stream> for StreamEowcOverWindow {
177 fn input(&self) -> PlanRef {
178 self.core.input.clone()
179 }
180
181 fn clone_with_input(&self, input: PlanRef) -> Self {
182 let mut core = self.core.clone();
183 core.input = input;
184 Self::new(core)
185 }
186}
187impl_plan_tree_node_for_unary! { Stream, StreamEowcOverWindow }
188
189impl StreamNode for StreamEowcOverWindow {
190 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
191 use risingwave_pb::stream_plan::*;
192
193 let calls = self
194 .window_functions()
195 .iter()
196 .map(PlanWindowFunction::to_protobuf)
197 .collect();
198 let partition_by = self.window_functions()[0]
199 .partition_by
200 .iter()
201 .map(|i| i.index() as _)
202 .collect();
203 let order_by = self.window_functions()[0]
204 .order_by
205 .iter()
206 .map(|o| o.to_protobuf())
207 .collect();
208 let state_table = self
209 .infer_state_table()
210 .with_id(state.gen_table_id_wrapped())
211 .to_internal_table_prost();
212
213 let intermediate_state_table = if self.needs_intermediate_state_table() {
215 Some(
216 self.infer_intermediate_state_table()
217 .with_id(state.gen_table_id_wrapped())
218 .to_internal_table_prost(),
219 )
220 } else {
221 None
222 };
223
224 PbNodeBody::EowcOverWindow(Box::new(EowcOverWindowNode {
225 calls,
226 partition_by,
227 order_by,
228 state_table: Some(state_table),
229 intermediate_state_table,
230 }))
231 }
232}
233
234impl ExprRewritable<Stream> for StreamEowcOverWindow {}
235
236impl ExprVisitable for StreamEowcOverWindow {}