risingwave_frontend/optimizer/plan_node/generic/
hop_window.rs1use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, Interval};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_expr::ExprError;
23
24use super::super::utils::IndicesDisplay;
25use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::plan_node::batch::BatchPlanRef;
30use crate::optimizer::property::{FunctionalDependencySet, Order};
31use crate::utils::ColIndexMappingRewriteExt;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct HopWindow<PlanRef> {
36 pub input: PlanRef,
37 pub time_col: InputRef,
38 pub window_slide: Interval,
39 pub window_size: Interval,
40 pub window_offset: Interval,
41 pub output_indices: Vec<usize>,
49}
50
51impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
52 fn schema(&self) -> Schema {
53 let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
54 let mut original_schema = self.input.schema().clone();
55 original_schema.fields.reserve_exact(2);
56 let window_start = Field::with_name(output_type.clone(), "window_start");
57 let window_end = Field::with_name(output_type, "window_end");
58 original_schema.fields.push(window_start);
59 original_schema.fields.push(window_end);
60 self.output_indices
61 .iter()
62 .map(|&idx| original_schema[idx].clone())
63 .collect()
64 }
65
66 fn stream_key(&self) -> Option<Vec<usize>> {
67 let window_start_index = self
68 .output_indices
69 .iter()
70 .position(|&idx| idx == self.input.schema().len());
71 let window_end_index = self
72 .output_indices
73 .iter()
74 .position(|&idx| idx == self.input.schema().len() + 1);
75 if window_start_index.is_none() && window_end_index.is_none() {
76 None
77 } else {
78 let mut pk = self
79 .input
80 .stream_key()?
81 .iter()
82 .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
83 .collect_vec();
84 if let Some(start_idx) = window_start_index {
85 pk.push(start_idx);
86 };
87 if let Some(end_idx) = window_end_index {
88 pk.push(end_idx);
89 };
90 Some(pk)
91 }
92 }
93
94 fn ctx(&self) -> OptimizerContextRef {
95 self.input.ctx()
96 }
97
98 fn functional_dependency(&self) -> FunctionalDependencySet {
99 let mut fd_set = self
100 .i2o_col_mapping()
101 .rewrite_functional_dependency_set(self.input.functional_dependency().clone());
102 let (start_idx_in_output, end_idx_in_output) = {
103 let internal2output = self.internal2output_col_mapping();
104 (
105 internal2output.try_map(self.internal_window_start_col_idx()),
106 internal2output.try_map(self.internal_window_end_col_idx()),
107 )
108 };
109 if let Some(start_idx) = start_idx_in_output
110 && let Some(end_idx) = end_idx_in_output
111 {
112 fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
113 fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
114 }
115 fd_set
116 }
117}
118
119impl<PlanRef: BatchPlanRef> HopWindow<PlanRef> {
120 pub fn get_out_column_index_order(&self) -> Order {
121 self.i2o_col_mapping()
122 .rewrite_provided_order(self.input.order())
123 }
124}
125
126impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
127 pub fn output_window_start_col_idx(&self) -> Option<usize> {
128 self.internal2output_col_mapping()
129 .try_map(self.internal_window_start_col_idx())
130 }
131
132 pub fn output_window_end_col_idx(&self) -> Option<usize> {
133 self.internal2output_col_mapping()
134 .try_map(self.internal_window_end_col_idx())
135 }
136
137 pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
138 (
139 self.input,
140 self.time_col,
141 self.window_slide,
142 self.window_size,
143 self.window_offset,
144 self.output_indices,
145 )
146 }
147
148 pub fn internal_window_start_col_idx(&self) -> usize {
149 self.input.schema().len()
150 }
151
152 pub fn internal_window_end_col_idx(&self) -> usize {
153 self.input.schema().len() + 1
154 }
155
156 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
157 self.output2internal_col_mapping()
158 .composite(&self.internal2input_col_mapping())
159 }
160
161 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
162 self.input2internal_col_mapping()
163 .composite(&self.internal2output_col_mapping())
164 }
165
166 pub fn internal_column_num(&self) -> usize {
167 self.input.schema().len() + 2
168 }
169
170 pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
171 self.internal2output_col_mapping()
172 .inverse()
173 .expect("must be invertible")
174 }
175
176 pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
177 ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
178 }
179
180 pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
181 ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
182 }
183
184 pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
185 ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
186 }
187
188 pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
189 let Self {
190 window_size,
191 window_slide,
192 window_offset,
193 time_col,
194 ..
195 } = &self;
196 let units = window_size
197 .exact_div(window_slide)
198 .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
199 .ok_or_else(|| ExprError::InvalidParam {
200 name: "window",
201 reason: format!(
202 "window_size {} cannot be divided by window_slide {}",
203 window_size, window_slide
204 )
205 .into(),
206 })?
207 .get();
208 let window_size_expr: ExprImpl =
209 Literal::new(Some((*window_size).into()), DataType::Interval).into();
210 let window_slide_expr: ExprImpl =
211 Literal::new(Some((*window_slide).into()), DataType::Interval).into();
212 let window_offset_expr: ExprImpl =
213 Literal::new(Some((*window_offset).into()), DataType::Interval).into();
214
215 let window_size_sub_slide = FunctionCall::new(
216 ExprType::Subtract,
217 vec![window_size_expr, window_slide_expr.clone()],
218 )?
219 .into();
220
221 let time_col_shifted = FunctionCall::new(
222 ExprType::Subtract,
223 vec![
224 ExprImpl::InputRef(Box::new(time_col.clone())),
225 window_size_sub_slide,
226 ],
227 )?
228 .into();
229
230 let hop_start: ExprImpl = FunctionCall::new(
231 ExprType::TumbleStart,
232 vec![time_col_shifted, window_slide_expr, window_offset_expr],
233 )?
234 .into();
235
236 let mut window_start_exprs = Vec::with_capacity(units);
237 let mut window_end_exprs = Vec::with_capacity(units);
238 for i in 0..units {
239 {
240 let window_start_offset =
241 window_slide
242 .checked_mul_int(i)
243 .ok_or_else(|| ExprError::InvalidParam {
244 name: "window",
245 reason: format!(
246 "window_slide {} cannot be multiplied by {}",
247 window_slide, i
248 )
249 .into(),
250 })?;
251 let window_start_offset_expr =
252 Literal::new(Some(window_start_offset.into()), DataType::Interval).into();
253 let window_start_expr = FunctionCall::new(
254 ExprType::Add,
255 vec![hop_start.clone(), window_start_offset_expr],
256 )?
257 .into();
258 window_start_exprs.push(window_start_expr);
259 }
260 {
261 let window_end_offset =
262 window_slide.checked_mul_int(i + units).ok_or_else(|| {
263 ExprError::InvalidParam {
264 name: "window",
265 reason: format!(
266 "window_slide {} cannot be multiplied by {}",
267 window_slide,
268 i + units
269 )
270 .into(),
271 }
272 })?;
273 let window_end_offset_expr =
274 Literal::new(Some(window_end_offset.into()), DataType::Interval).into();
275 let window_end_expr = FunctionCall::new(
276 ExprType::Add,
277 vec![hop_start.clone(), window_end_offset_expr],
278 )?
279 .into();
280 window_end_exprs.push(window_end_expr);
281 }
282 }
283 assert_eq!(window_start_exprs.len(), window_end_exprs.len());
284 Ok((window_start_exprs, window_end_exprs))
285 }
286
287 pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
288 let mut out = Vec::with_capacity(5);
289 let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
290 out.push((
291 "time_col",
292 Pretty::display(&InputRefDisplay {
293 input_ref: &self.time_col,
294 input_schema: self.input.schema(),
295 }),
296 ));
297 out.push(("slide", Pretty::display(&self.window_slide)));
298 out.push(("size", Pretty::display(&self.window_size)));
299 if self
300 .output_indices
301 .iter()
302 .copied()
303 .eq(0..(self.input.schema().len() + 2))
305 {
306 out.push(("output", Pretty::from("all")));
307 } else {
308 let original_schema: Schema = self
309 .input
310 .schema()
311 .clone()
312 .into_fields()
313 .into_iter()
314 .chain([
315 Field::with_name(output_type.clone(), "window_start"),
316 Field::with_name(output_type, "window_end"),
317 ])
318 .collect();
319 let id = IndicesDisplay {
320 indices: &self.output_indices,
321 schema: &original_schema,
322 };
323 out.push(("output", id.distill()));
324 }
325 out
326 }
327}
328
329impl_distill_unit_from_fields!(HopWindow, GenericPlanRef);