risingwave_frontend/optimizer/plan_node/generic/
hop_window.rs1use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, Interval};
21use risingwave_common::util::column_index_mapping::ColIndexMapping;
22use risingwave_expr::ExprError;
23
24use super::super::utils::IndicesDisplay;
25use super::{GenericPlanNode, GenericPlanRef, impl_distill_unit_from_fields};
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal};
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::plan_node::batch::BatchPlanNodeMetadata;
30use crate::optimizer::property::{FunctionalDependencySet, Order};
31use crate::utils::ColIndexMappingRewriteExt;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct HopWindow<PlanRef> {
36 pub input: PlanRef,
37 pub time_col: InputRef,
38 pub window_slide: Interval,
39 pub window_size: Interval,
40 pub window_offset: Interval,
41 pub output_indices: Vec<usize>,
49}
50
51impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
52 fn schema(&self) -> Schema {
53 let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
54 let mut original_schema = self.input.schema().clone();
55 original_schema.fields.reserve_exact(2);
56 let window_start = Field::with_name(output_type.clone(), "window_start");
57 let window_end = Field::with_name(output_type, "window_end");
58 original_schema.fields.push(window_start);
59 original_schema.fields.push(window_end);
60 self.output_indices
61 .iter()
62 .map(|&idx| original_schema[idx].clone())
63 .collect()
64 }
65
66 fn stream_key(&self) -> Option<Vec<usize>> {
67 let window_start_index = self
68 .output_indices
69 .iter()
70 .position(|&idx| idx == self.input.schema().len());
71 let window_end_index = self
72 .output_indices
73 .iter()
74 .position(|&idx| idx == self.input.schema().len() + 1);
75 if window_start_index.is_none() && window_end_index.is_none() {
76 None
77 } else {
78 let mut pk = self
79 .input
80 .stream_key()?
81 .iter()
82 .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
83 .collect_vec();
84 if let Some(start_idx) = window_start_index {
85 pk.push(start_idx);
86 };
87 if let Some(end_idx) = window_end_index {
88 pk.push(end_idx);
89 };
90 Some(pk)
91 }
92 }
93
94 fn ctx(&self) -> OptimizerContextRef {
95 self.input.ctx()
96 }
97
98 fn functional_dependency(&self) -> FunctionalDependencySet {
99 let mut fd_set = self
100 .i2o_col_mapping()
101 .rewrite_functional_dependency_set(self.input.functional_dependency().clone());
102 let (start_idx_in_output, end_idx_in_output) = {
103 let internal2output = self.internal2output_col_mapping();
104 (
105 internal2output.try_map(self.internal_window_start_col_idx()),
106 internal2output.try_map(self.internal_window_end_col_idx()),
107 )
108 };
109 if let Some(start_idx) = start_idx_in_output
110 && let Some(end_idx) = end_idx_in_output
111 {
112 fd_set.add_functional_dependency_by_column_indices(&[start_idx], &[end_idx]);
113 fd_set.add_functional_dependency_by_column_indices(&[end_idx], &[start_idx]);
114 }
115 fd_set
116 }
117}
118
119impl<PlanRef: BatchPlanNodeMetadata> HopWindow<PlanRef> {
120 pub fn get_out_column_index_order(&self) -> Order {
121 self.i2o_col_mapping()
122 .rewrite_provided_order(self.input.order())
123 }
124}
125
126impl<PlanRef: GenericPlanRef> HopWindow<PlanRef> {
127 pub fn clone_with_input<OtherPlanRef>(&self, input: OtherPlanRef) -> HopWindow<OtherPlanRef> {
128 HopWindow {
129 input,
130 time_col: self.time_col.clone(),
131 window_slide: self.window_slide,
132 window_size: self.window_size,
133 window_offset: self.window_offset,
134 output_indices: self.output_indices.clone(),
135 }
136 }
137
138 pub fn output_window_start_col_idx(&self) -> Option<usize> {
139 self.internal2output_col_mapping()
140 .try_map(self.internal_window_start_col_idx())
141 }
142
143 pub fn output_window_end_col_idx(&self) -> Option<usize> {
144 self.internal2output_col_mapping()
145 .try_map(self.internal_window_end_col_idx())
146 }
147
148 pub fn into_parts(self) -> (PlanRef, InputRef, Interval, Interval, Interval, Vec<usize>) {
149 (
150 self.input,
151 self.time_col,
152 self.window_slide,
153 self.window_size,
154 self.window_offset,
155 self.output_indices,
156 )
157 }
158
159 pub fn internal_window_start_col_idx(&self) -> usize {
160 self.input.schema().len()
161 }
162
163 pub fn internal_window_end_col_idx(&self) -> usize {
164 self.input.schema().len() + 1
165 }
166
167 pub fn o2i_col_mapping(&self) -> ColIndexMapping {
168 self.output2internal_col_mapping()
169 .composite(&self.internal2input_col_mapping())
170 }
171
172 pub fn i2o_col_mapping(&self) -> ColIndexMapping {
173 self.input2internal_col_mapping()
174 .composite(&self.internal2output_col_mapping())
175 }
176
177 pub fn internal_column_num(&self) -> usize {
178 self.input.schema().len() + 2
179 }
180
181 pub fn output2internal_col_mapping(&self) -> ColIndexMapping {
182 self.internal2output_col_mapping()
183 .inverse()
184 .expect("must be invertible")
185 }
186
187 pub fn internal2output_col_mapping(&self) -> ColIndexMapping {
188 ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num())
189 }
190
191 pub fn input2internal_col_mapping(&self) -> ColIndexMapping {
192 ColIndexMapping::identity_or_none(self.input.schema().len(), self.internal_column_num())
193 }
194
195 pub fn internal2input_col_mapping(&self) -> ColIndexMapping {
196 ColIndexMapping::identity_or_none(self.internal_column_num(), self.input.schema().len())
197 }
198
199 pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec<ExprImpl>, Vec<ExprImpl>)> {
200 let Self {
201 window_size,
202 window_slide,
203 window_offset,
204 time_col,
205 ..
206 } = &self;
207 let units = window_size
208 .exact_div(window_slide)
209 .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
210 .ok_or_else(|| ExprError::InvalidParam {
211 name: "window",
212 reason: format!(
213 "window_size {} cannot be divided by window_slide {}",
214 window_size, window_slide
215 )
216 .into(),
217 })?
218 .get();
219 let window_size_expr: ExprImpl =
220 Literal::new(Some((*window_size).into()), DataType::Interval).into();
221 let window_slide_expr: ExprImpl =
222 Literal::new(Some((*window_slide).into()), DataType::Interval).into();
223 let window_offset_expr: ExprImpl =
224 Literal::new(Some((*window_offset).into()), DataType::Interval).into();
225
226 let window_size_sub_slide = FunctionCall::new(
227 ExprType::Subtract,
228 vec![window_size_expr, window_slide_expr.clone()],
229 )?
230 .into();
231
232 let time_col_shifted = FunctionCall::new(
233 ExprType::Subtract,
234 vec![
235 ExprImpl::InputRef(Box::new(time_col.clone())),
236 window_size_sub_slide,
237 ],
238 )?
239 .into();
240
241 let hop_start: ExprImpl = FunctionCall::new(
242 ExprType::TumbleStart,
243 vec![time_col_shifted, window_slide_expr, window_offset_expr],
244 )?
245 .into();
246
247 let mut window_start_exprs = Vec::with_capacity(units);
248 let mut window_end_exprs = Vec::with_capacity(units);
249 for i in 0..units {
250 {
251 let window_start_offset =
252 window_slide
253 .checked_mul_int(i)
254 .ok_or_else(|| ExprError::InvalidParam {
255 name: "window",
256 reason: format!(
257 "window_slide {} cannot be multiplied by {}",
258 window_slide, i
259 )
260 .into(),
261 })?;
262 let window_start_offset_expr =
263 Literal::new(Some(window_start_offset.into()), DataType::Interval).into();
264 let window_start_expr = FunctionCall::new(
265 ExprType::Add,
266 vec![hop_start.clone(), window_start_offset_expr],
267 )?
268 .into();
269 window_start_exprs.push(window_start_expr);
270 }
271 {
272 let window_end_offset =
273 window_slide.checked_mul_int(i + units).ok_or_else(|| {
274 ExprError::InvalidParam {
275 name: "window",
276 reason: format!(
277 "window_slide {} cannot be multiplied by {}",
278 window_slide,
279 i + units
280 )
281 .into(),
282 }
283 })?;
284 let window_end_offset_expr =
285 Literal::new(Some(window_end_offset.into()), DataType::Interval).into();
286 let window_end_expr = FunctionCall::new(
287 ExprType::Add,
288 vec![hop_start.clone(), window_end_offset_expr],
289 )?
290 .into();
291 window_end_exprs.push(window_end_expr);
292 }
293 }
294 assert_eq!(window_start_exprs.len(), window_end_exprs.len());
295 Ok((window_start_exprs, window_end_exprs))
296 }
297
298 pub fn fields_pretty<'a>(&self) -> StrAssocArr<'a> {
299 let mut out = Vec::with_capacity(5);
300 let output_type = DataType::window_of(&self.time_col.data_type).unwrap();
301 out.push((
302 "time_col",
303 Pretty::display(&InputRefDisplay {
304 input_ref: &self.time_col,
305 input_schema: self.input.schema(),
306 }),
307 ));
308 out.push(("slide", Pretty::display(&self.window_slide)));
309 out.push(("size", Pretty::display(&self.window_size)));
310 if self
311 .output_indices
312 .iter()
313 .copied()
314 .eq(0..(self.input.schema().len() + 2))
316 {
317 out.push(("output", Pretty::from("all")));
318 } else {
319 let original_schema: Schema = self
320 .input
321 .schema()
322 .clone()
323 .into_fields()
324 .into_iter()
325 .chain([
326 Field::with_name(output_type.clone(), "window_start"),
327 Field::with_name(output_type, "window_end"),
328 ])
329 .collect();
330 let id = IndicesDisplay {
331 indices: &self.output_indices,
332 schema: &original_schema,
333 };
334 out.push(("output", id.distill()));
335 }
336 out
337 }
338}
339
340impl_distill_unit_from_fields!(HopWindow, GenericPlanRef);