risingwave_batch_executors/executor/
hop_window.rs1use std::num::NonZeroUsize;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, Interval};
22use risingwave_expr::ExprError;
23use risingwave_expr::expr::{BoxedExpression, build_from_prost};
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25
26use crate::error::{BatchError, Result};
27use crate::executor::{
28 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
29};
30
31pub struct HopWindowExecutor {
32 child: BoxedExecutor,
33 identity: String,
34 schema: Schema,
35 window_slide: Interval,
36 window_size: Interval,
37 window_start_exprs: Vec<BoxedExpression>,
38 window_end_exprs: Vec<BoxedExpression>,
39 output_indices: Vec<usize>,
40}
41
42impl BoxedExecutorBuilder for HopWindowExecutor {
43 async fn new_boxed_executor(
44 source: &ExecutorBuilder<'_>,
45 inputs: Vec<BoxedExecutor>,
46 ) -> Result<BoxedExecutor> {
47 let [child]: [_; 1] = inputs.try_into().unwrap();
48
49 let hop_window_node = try_match_expand!(
50 source.plan_node().get_node_body().unwrap(),
51 NodeBody::HopWindow
52 )?;
53 let window_slide = hop_window_node.get_window_slide()?.into();
54 let window_size = hop_window_node.get_window_size()?.into();
55 let output_indices = hop_window_node
56 .get_output_indices()
57 .iter()
58 .cloned()
59 .map(|x| x as usize)
60 .collect_vec();
61
62 let window_start_exprs: Vec<_> = hop_window_node
63 .get_window_start_exprs()
64 .iter()
65 .map(build_from_prost)
66 .try_collect()?;
67 let window_end_exprs: Vec<_> = hop_window_node
68 .get_window_end_exprs()
69 .iter()
70 .map(build_from_prost)
71 .try_collect()?;
72 assert_eq!(window_start_exprs.len(), window_end_exprs.len());
73
74 let time_col = hop_window_node.get_time_col() as usize;
75 let time_col_data_type = child.schema().fields()[time_col].data_type();
76 let output_type = DataType::window_of(&time_col_data_type).unwrap();
77 let original_schema: Schema = child
78 .schema()
79 .clone()
80 .into_fields()
81 .into_iter()
82 .chain([
83 Field::with_name(output_type.clone(), "window_start"),
84 Field::with_name(output_type, "window_end"),
85 ])
86 .collect();
87 let output_indices_schema: Schema = output_indices
88 .iter()
89 .map(|&idx| original_schema[idx].clone())
90 .collect();
91 Ok(Box::new(HopWindowExecutor::new(
92 child,
93 output_indices_schema,
94 window_slide,
95 window_size,
96 source.plan_node().get_identity().clone(),
97 window_start_exprs,
98 window_end_exprs,
99 output_indices,
100 )))
101 }
102}
103
104impl HopWindowExecutor {
105 #[allow(clippy::too_many_arguments)]
106 fn new(
107 child: BoxedExecutor,
108 schema: Schema,
109 window_slide: Interval,
110 window_size: Interval,
111 identity: String,
112 window_start_exprs: Vec<BoxedExpression>,
113 window_end_exprs: Vec<BoxedExpression>,
114 output_indices: Vec<usize>,
115 ) -> Self {
116 Self {
117 child,
118 identity,
119 schema,
120 window_slide,
121 window_size,
122 window_start_exprs,
123 window_end_exprs,
124 output_indices,
125 }
126 }
127}
128
129impl Executor for HopWindowExecutor {
130 fn schema(&self) -> &Schema {
131 &self.schema
132 }
133
134 fn identity(&self) -> &str {
135 &self.identity
136 }
137
138 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
139 self.do_execute()
140 }
141}
142
143impl HopWindowExecutor {
144 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
145 async fn do_execute(self: Box<Self>) {
146 let Self {
147 child,
148 window_slide,
149 window_size,
150 output_indices,
151 ..
152 } = *self;
153 let units = window_size
154 .exact_div(&window_slide)
155 .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
156 .ok_or_else(|| ExprError::InvalidParam {
157 name: "window",
158 reason: format!(
159 "window_size {} cannot be divided by window_slide {}",
160 window_size, window_slide
161 )
162 .into(),
163 })?
164 .get();
165
166 let window_start_col_index = child.schema().len();
167 let window_end_col_index = child.schema().len() + 1;
168 #[for_await]
169 for data_chunk in child.execute() {
170 let data_chunk = data_chunk?;
171 assert!(data_chunk.is_compacted());
172 let len = data_chunk.cardinality();
173 for i in 0..units {
174 let window_start_col = if output_indices.contains(&window_start_col_index) {
175 Some(self.window_start_exprs[i].eval(&data_chunk).await?)
176 } else {
177 None
178 };
179 let window_end_col = if output_indices.contains(&window_end_col_index) {
180 Some(self.window_end_exprs[i].eval(&data_chunk).await?)
181 } else {
182 None
183 };
184 let new_cols = output_indices
185 .iter()
186 .filter_map(|&idx| {
187 if idx < window_start_col_index {
188 Some(data_chunk.column_at(idx).clone())
189 } else if idx == window_start_col_index {
190 Some(window_start_col.clone().unwrap())
191 } else if idx == window_end_col_index {
192 Some(window_end_col.clone().unwrap())
193 } else {
194 None
195 }
196 })
197 .collect_vec();
198 yield DataChunk::new(new_cols, len);
199 }
200 }
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use futures::stream::StreamExt;
207 use risingwave_common::array::DataChunkTestExt;
208 use risingwave_common::types::test_utils::IntervalTestExt;
209 use risingwave_expr::expr::test_utils::make_hop_window_expression;
210
211 use super::*;
212 use crate::executor::test_utils::MockExecutor;
213
214 fn create_executor(
215 output_indices: Vec<usize>,
216 window_slide: Interval,
217 window_size: Interval,
218 window_offset: Interval,
219 ) -> Box<HopWindowExecutor> {
220 let field1 = Field::unnamed(DataType::Int64);
221 let field2 = Field::unnamed(DataType::Int64);
222 let field3 = Field::with_name(DataType::Timestamp, "created_at");
223 let schema = Schema::new(vec![field1, field2, field3]);
224
225 let chunk = DataChunk::from_pretty(
226 &"I I TS
227 1 1 ^10:00:00
228 2 3 ^10:05:00
229 3 2 ^10:14:00
230 4 1 ^10:22:00
231 5 3 ^10:33:00
232 6 2 ^10:42:00
233 7 1 ^10:51:00
234 8 3 ^11:02:00"
235 .replace('^', "2022-02-02T"),
236 );
237 let mut mock_executor = MockExecutor::new(schema.clone());
238 mock_executor.add(chunk);
239
240 let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
241 DataType::Timestamp,
242 2,
243 window_size,
244 window_slide,
245 window_offset,
246 )
247 .unwrap();
248
249 Box::new(HopWindowExecutor::new(
250 Box::new(mock_executor),
251 schema,
252 window_slide,
253 window_size,
254 "test".to_owned(),
255 window_start_exprs,
256 window_end_exprs,
257 output_indices,
258 ))
259 }
260
261 #[tokio::test]
262 async fn test_window_offset() {
263 async fn test_window_offset_helper(window_offset: Interval) -> DataChunk {
264 let default_indices = (0..3 + 2).collect_vec();
265 let window_slide = Interval::from_minutes(15);
266 let window_size = Interval::from_minutes(30);
267 let executor =
268 create_executor(default_indices, window_slide, window_size, window_offset);
269 let mut stream = executor.execute();
270 stream.next().await.unwrap().unwrap()
271 }
272
273 let window_size = 30;
274 for offset in 0..window_size {
275 for coefficient in -5..0 {
276 assert_eq!(
277 test_window_offset_helper(Interval::from_minutes(
278 coefficient * window_size + offset
279 ))
280 .await,
281 test_window_offset_helper(Interval::from_minutes(
282 (coefficient - 1) * window_size + offset
283 ))
284 .await
285 );
286 }
287 }
288 for offset in 0..window_size {
289 for coefficient in 0..5 {
290 assert_eq!(
291 test_window_offset_helper(Interval::from_minutes(
292 coefficient * window_size + offset
293 ))
294 .await,
295 test_window_offset_helper(Interval::from_minutes(
296 (coefficient + 1) * window_size + offset
297 ))
298 .await
299 );
300 }
301 }
302 for offset in -window_size..window_size {
303 assert_eq!(
304 test_window_offset_helper(Interval::from_minutes(window_size + offset)).await,
305 test_window_offset_helper(Interval::from_minutes(-window_size + offset)).await
306 );
307 }
308
309 assert_eq!(
310 test_window_offset_helper(Interval::from_minutes(-31)).await,
311 DataChunk::from_pretty(
312 &"I I TS TS TS
313 1 1 ^10:00:00 ^09:44:00 ^10:14:00
314 2 3 ^10:05:00 ^09:44:00 ^10:14:00
315 3 2 ^10:14:00 ^09:59:00 ^10:29:00
316 4 1 ^10:22:00 ^09:59:00 ^10:29:00
317 5 3 ^10:33:00 ^10:14:00 ^10:44:00
318 6 2 ^10:42:00 ^10:14:00 ^10:44:00
319 7 1 ^10:51:00 ^10:29:00 ^10:59:00
320 8 3 ^11:02:00 ^10:44:00 ^11:14:00"
321 .replace('^', "2022-02-02T"),
322 )
323 );
324 assert_eq!(
325 test_window_offset_helper(Interval::from_minutes(29)).await,
326 DataChunk::from_pretty(
327 &"I I TS TS TS
328 1 1 ^10:00:00 ^09:44:00 ^10:14:00
329 2 3 ^10:05:00 ^09:44:00 ^10:14:00
330 3 2 ^10:14:00 ^09:59:00 ^10:29:00
331 4 1 ^10:22:00 ^09:59:00 ^10:29:00
332 5 3 ^10:33:00 ^10:14:00 ^10:44:00
333 6 2 ^10:42:00 ^10:14:00 ^10:44:00
334 7 1 ^10:51:00 ^10:29:00 ^10:59:00
335 8 3 ^11:02:00 ^10:44:00 ^11:14:00"
336 .replace('^', "2022-02-02T"),
337 )
338 );
339 }
340
341 #[tokio::test]
342 async fn test_execute() {
343 let default_indices = (0..3 + 2).collect_vec();
344
345 let window_slide = Interval::from_minutes(15);
346 let window_size = Interval::from_minutes(30);
347 let window_offset = Interval::from_minutes(0);
348 let executor = create_executor(default_indices, window_slide, window_size, window_offset);
349
350 let mut stream = executor.execute();
351 let chunk = stream.next().await.unwrap().unwrap();
354 assert_eq!(
355 chunk,
356 DataChunk::from_pretty(
357 &"I I TS TS TS
358 1 1 ^10:00:00 ^09:45:00 ^10:15:00
359 2 3 ^10:05:00 ^09:45:00 ^10:15:00
360 3 2 ^10:14:00 ^09:45:00 ^10:15:00
361 4 1 ^10:22:00 ^10:00:00 ^10:30:00
362 5 3 ^10:33:00 ^10:15:00 ^10:45:00
363 6 2 ^10:42:00 ^10:15:00 ^10:45:00
364 7 1 ^10:51:00 ^10:30:00 ^11:00:00
365 8 3 ^11:02:00 ^10:45:00 ^11:15:00"
366 .replace('^', "2022-02-02T"),
367 )
368 );
369
370 let chunk = stream.next().await.unwrap().unwrap();
371 assert_eq!(
372 chunk,
373 DataChunk::from_pretty(
374 &"I I TS TS TS
375 1 1 ^10:00:00 ^10:00:00 ^10:30:00
376 2 3 ^10:05:00 ^10:00:00 ^10:30:00
377 3 2 ^10:14:00 ^10:00:00 ^10:30:00
378 4 1 ^10:22:00 ^10:15:00 ^10:45:00
379 5 3 ^10:33:00 ^10:30:00 ^11:00:00
380 6 2 ^10:42:00 ^10:30:00 ^11:00:00
381 7 1 ^10:51:00 ^10:45:00 ^11:15:00
382 8 3 ^11:02:00 ^11:00:00 ^11:30:00"
383 .replace('^', "2022-02-02T"),
384 )
385 );
386 }
387 #[tokio::test]
388 async fn test_output_indices() {
389 let window_slide = Interval::from_minutes(15);
390 let window_size = Interval::from_minutes(30);
391 let window_offset = Interval::from_minutes(0);
392 let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);
393
394 let mut stream = executor.execute();
395 let chunk = stream.next().await.unwrap().unwrap();
398 assert_eq!(
399 chunk,
400 DataChunk::from_pretty(
401 &" I TS TS TS
402 1 ^09:45:00 ^10:15:00 ^10:00:00
403 3 ^09:45:00 ^10:15:00 ^10:05:00
404 2 ^09:45:00 ^10:15:00 ^10:14:00
405 1 ^10:00:00 ^10:30:00 ^10:22:00
406 3 ^10:15:00 ^10:45:00 ^10:33:00
407 2 ^10:15:00 ^10:45:00 ^10:42:00
408 1 ^10:30:00 ^11:00:00 ^10:51:00
409 3 ^10:45:00 ^11:15:00 ^11:02:00"
410 .replace('^', "2022-02-02T"),
411 )
412 );
413
414 let chunk = stream.next().await.unwrap().unwrap();
415 assert_eq!(
416 chunk,
417 DataChunk::from_pretty(
418 &"I TS TS TS
419 1 ^10:00:00 ^10:30:00 ^10:00:00
420 3 ^10:00:00 ^10:30:00 ^10:05:00
421 2 ^10:00:00 ^10:30:00 ^10:14:00
422 1 ^10:15:00 ^10:45:00 ^10:22:00
423 3 ^10:30:00 ^11:00:00 ^10:33:00
424 2 ^10:30:00 ^11:00:00 ^10:42:00
425 1 ^10:45:00 ^11:15:00 ^10:51:00
426 3 ^11:00:00 ^11:30:00 ^11:02:00"
427 .replace('^', "2022-02-02T"),
428 )
429 );
430 }
431}