risingwave_batch_executors/executor/
hop_window.rs1use std::num::NonZeroUsize;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, Interval};
22use risingwave_expr::ExprError;
23use risingwave_expr::expr::{BoxedExpression, build_from_prost};
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25
26use crate::error::{BatchError, Result};
27use crate::executor::{
28 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
29};
30
31pub struct HopWindowExecutor {
32 child: BoxedExecutor,
33 identity: String,
34 schema: Schema,
35 window_slide: Interval,
36 window_size: Interval,
37 window_start_exprs: Vec<BoxedExpression>,
38 window_end_exprs: Vec<BoxedExpression>,
39 output_indices: Vec<usize>,
40}
41
42impl BoxedExecutorBuilder for HopWindowExecutor {
43 async fn new_boxed_executor(
44 source: &ExecutorBuilder<'_>,
45 inputs: Vec<BoxedExecutor>,
46 ) -> Result<BoxedExecutor> {
47 let [child]: [_; 1] = inputs.try_into().unwrap();
48
49 let hop_window_node = try_match_expand!(
50 source.plan_node().get_node_body().unwrap(),
51 NodeBody::HopWindow
52 )?;
53 let window_slide = hop_window_node.get_window_slide()?.into();
54 let window_size = hop_window_node.get_window_size()?.into();
55 let output_indices = hop_window_node
56 .get_output_indices()
57 .iter()
58 .cloned()
59 .map(|x| x as usize)
60 .collect_vec();
61
62 let window_start_exprs: Vec<_> = hop_window_node
63 .get_window_start_exprs()
64 .iter()
65 .map(build_from_prost)
66 .try_collect()?;
67 let window_end_exprs: Vec<_> = hop_window_node
68 .get_window_end_exprs()
69 .iter()
70 .map(build_from_prost)
71 .try_collect()?;
72 assert_eq!(window_start_exprs.len(), window_end_exprs.len());
73
74 let time_col = hop_window_node.get_time_col() as usize;
75 let time_col_data_type = child.schema().fields()[time_col].data_type();
76 let output_type = DataType::window_of(&time_col_data_type).unwrap();
77 let original_schema: Schema = child
78 .schema()
79 .clone()
80 .into_fields()
81 .into_iter()
82 .chain([
83 Field::with_name(output_type.clone(), "window_start"),
84 Field::with_name(output_type, "window_end"),
85 ])
86 .collect();
87 let output_indices_schema: Schema = output_indices
88 .iter()
89 .map(|&idx| original_schema[idx].clone())
90 .collect();
91 Ok(Box::new(HopWindowExecutor::new(
92 child,
93 output_indices_schema,
94 window_slide,
95 window_size,
96 source.plan_node().get_identity().clone(),
97 window_start_exprs,
98 window_end_exprs,
99 output_indices,
100 )))
101 }
102}
103
104impl HopWindowExecutor {
105 fn new(
106 child: BoxedExecutor,
107 schema: Schema,
108 window_slide: Interval,
109 window_size: Interval,
110 identity: String,
111 window_start_exprs: Vec<BoxedExpression>,
112 window_end_exprs: Vec<BoxedExpression>,
113 output_indices: Vec<usize>,
114 ) -> Self {
115 Self {
116 child,
117 identity,
118 schema,
119 window_slide,
120 window_size,
121 window_start_exprs,
122 window_end_exprs,
123 output_indices,
124 }
125 }
126}
127
128impl Executor for HopWindowExecutor {
129 fn schema(&self) -> &Schema {
130 &self.schema
131 }
132
133 fn identity(&self) -> &str {
134 &self.identity
135 }
136
137 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
138 self.do_execute()
139 }
140}
141
142impl HopWindowExecutor {
143 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
144 async fn do_execute(self: Box<Self>) {
145 let Self {
146 child,
147 window_slide,
148 window_size,
149 output_indices,
150 ..
151 } = *self;
152 let units = window_size
153 .exact_div(&window_slide)
154 .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
155 .ok_or_else(|| ExprError::InvalidParam {
156 name: "window",
157 reason: format!(
158 "window_size {} cannot be divided by window_slide {}",
159 window_size, window_slide
160 )
161 .into(),
162 })?
163 .get();
164
165 let window_start_col_index = child.schema().len();
166 let window_end_col_index = child.schema().len() + 1;
167 #[for_await]
168 for data_chunk in child.execute() {
169 let data_chunk = data_chunk?;
170 assert!(data_chunk.is_vis_compacted());
171 let len = data_chunk.cardinality();
172 for i in 0..units {
173 let window_start_col = if output_indices.contains(&window_start_col_index) {
174 Some(self.window_start_exprs[i].eval(&data_chunk).await?)
175 } else {
176 None
177 };
178 let window_end_col = if output_indices.contains(&window_end_col_index) {
179 Some(self.window_end_exprs[i].eval(&data_chunk).await?)
180 } else {
181 None
182 };
183 let new_cols = output_indices
184 .iter()
185 .filter_map(|&idx| {
186 if idx < window_start_col_index {
187 Some(data_chunk.column_at(idx).clone())
188 } else if idx == window_start_col_index {
189 Some(window_start_col.clone().unwrap())
190 } else if idx == window_end_col_index {
191 Some(window_end_col.clone().unwrap())
192 } else {
193 None
194 }
195 })
196 .collect_vec();
197 yield DataChunk::new(new_cols, len);
198 }
199 }
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use futures::stream::StreamExt;
206 use risingwave_common::array::DataChunkTestExt;
207 use risingwave_common::types::test_utils::IntervalTestExt;
208 use risingwave_expr::expr::test_utils::make_hop_window_expression;
209
210 use super::*;
211 use crate::executor::test_utils::MockExecutor;
212
213 fn create_executor(
214 output_indices: Vec<usize>,
215 window_slide: Interval,
216 window_size: Interval,
217 window_offset: Interval,
218 ) -> Box<HopWindowExecutor> {
219 let field1 = Field::unnamed(DataType::Int64);
220 let field2 = Field::unnamed(DataType::Int64);
221 let field3 = Field::with_name(DataType::Timestamp, "created_at");
222 let schema = Schema::new(vec![field1, field2, field3]);
223
224 let chunk = DataChunk::from_pretty(
225 &"I I TS
226 1 1 ^10:00:00
227 2 3 ^10:05:00
228 3 2 ^10:14:00
229 4 1 ^10:22:00
230 5 3 ^10:33:00
231 6 2 ^10:42:00
232 7 1 ^10:51:00
233 8 3 ^11:02:00"
234 .replace('^', "2022-02-02T"),
235 );
236 let mut mock_executor = MockExecutor::new(schema.clone());
237 mock_executor.add(chunk);
238
239 let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
240 DataType::Timestamp,
241 2,
242 window_size,
243 window_slide,
244 window_offset,
245 )
246 .unwrap();
247
248 Box::new(HopWindowExecutor::new(
249 Box::new(mock_executor),
250 schema,
251 window_slide,
252 window_size,
253 "test".to_owned(),
254 window_start_exprs,
255 window_end_exprs,
256 output_indices,
257 ))
258 }
259
260 #[tokio::test]
261 async fn test_window_offset() {
262 async fn test_window_offset_helper(window_offset: Interval) -> DataChunk {
263 let default_indices = (0..3 + 2).collect_vec();
264 let window_slide = Interval::from_minutes(15);
265 let window_size = Interval::from_minutes(30);
266 let executor =
267 create_executor(default_indices, window_slide, window_size, window_offset);
268 let mut stream = executor.execute();
269 stream.next().await.unwrap().unwrap()
270 }
271
272 let window_size = 30;
273 for offset in 0..window_size {
274 for coefficient in -5..0 {
275 assert_eq!(
276 test_window_offset_helper(Interval::from_minutes(
277 coefficient * window_size + offset
278 ))
279 .await,
280 test_window_offset_helper(Interval::from_minutes(
281 (coefficient - 1) * window_size + offset
282 ))
283 .await
284 );
285 }
286 }
287 for offset in 0..window_size {
288 for coefficient in 0..5 {
289 assert_eq!(
290 test_window_offset_helper(Interval::from_minutes(
291 coefficient * window_size + offset
292 ))
293 .await,
294 test_window_offset_helper(Interval::from_minutes(
295 (coefficient + 1) * window_size + offset
296 ))
297 .await
298 );
299 }
300 }
301 for offset in -window_size..window_size {
302 assert_eq!(
303 test_window_offset_helper(Interval::from_minutes(window_size + offset)).await,
304 test_window_offset_helper(Interval::from_minutes(-window_size + offset)).await
305 );
306 }
307
308 assert_eq!(
309 test_window_offset_helper(Interval::from_minutes(-31)).await,
310 DataChunk::from_pretty(
311 &"I I TS TS TS
312 1 1 ^10:00:00 ^09:44:00 ^10:14:00
313 2 3 ^10:05:00 ^09:44:00 ^10:14:00
314 3 2 ^10:14:00 ^09:59:00 ^10:29:00
315 4 1 ^10:22:00 ^09:59:00 ^10:29:00
316 5 3 ^10:33:00 ^10:14:00 ^10:44:00
317 6 2 ^10:42:00 ^10:14:00 ^10:44:00
318 7 1 ^10:51:00 ^10:29:00 ^10:59:00
319 8 3 ^11:02:00 ^10:44:00 ^11:14:00"
320 .replace('^', "2022-02-02T"),
321 )
322 );
323 assert_eq!(
324 test_window_offset_helper(Interval::from_minutes(29)).await,
325 DataChunk::from_pretty(
326 &"I I TS TS TS
327 1 1 ^10:00:00 ^09:44:00 ^10:14:00
328 2 3 ^10:05:00 ^09:44:00 ^10:14:00
329 3 2 ^10:14:00 ^09:59:00 ^10:29:00
330 4 1 ^10:22:00 ^09:59:00 ^10:29:00
331 5 3 ^10:33:00 ^10:14:00 ^10:44:00
332 6 2 ^10:42:00 ^10:14:00 ^10:44:00
333 7 1 ^10:51:00 ^10:29:00 ^10:59:00
334 8 3 ^11:02:00 ^10:44:00 ^11:14:00"
335 .replace('^', "2022-02-02T"),
336 )
337 );
338 }
339
340 #[tokio::test]
341 async fn test_execute() {
342 let default_indices = (0..3 + 2).collect_vec();
343
344 let window_slide = Interval::from_minutes(15);
345 let window_size = Interval::from_minutes(30);
346 let window_offset = Interval::from_minutes(0);
347 let executor = create_executor(default_indices, window_slide, window_size, window_offset);
348
349 let mut stream = executor.execute();
350 let chunk = stream.next().await.unwrap().unwrap();
353 assert_eq!(
354 chunk,
355 DataChunk::from_pretty(
356 &"I I TS TS TS
357 1 1 ^10:00:00 ^09:45:00 ^10:15:00
358 2 3 ^10:05:00 ^09:45:00 ^10:15:00
359 3 2 ^10:14:00 ^09:45:00 ^10:15:00
360 4 1 ^10:22:00 ^10:00:00 ^10:30:00
361 5 3 ^10:33:00 ^10:15:00 ^10:45:00
362 6 2 ^10:42:00 ^10:15:00 ^10:45:00
363 7 1 ^10:51:00 ^10:30:00 ^11:00:00
364 8 3 ^11:02:00 ^10:45:00 ^11:15:00"
365 .replace('^', "2022-02-02T"),
366 )
367 );
368
369 let chunk = stream.next().await.unwrap().unwrap();
370 assert_eq!(
371 chunk,
372 DataChunk::from_pretty(
373 &"I I TS TS TS
374 1 1 ^10:00:00 ^10:00:00 ^10:30:00
375 2 3 ^10:05:00 ^10:00:00 ^10:30:00
376 3 2 ^10:14:00 ^10:00:00 ^10:30:00
377 4 1 ^10:22:00 ^10:15:00 ^10:45:00
378 5 3 ^10:33:00 ^10:30:00 ^11:00:00
379 6 2 ^10:42:00 ^10:30:00 ^11:00:00
380 7 1 ^10:51:00 ^10:45:00 ^11:15:00
381 8 3 ^11:02:00 ^11:00:00 ^11:30:00"
382 .replace('^', "2022-02-02T"),
383 )
384 );
385 }
386 #[tokio::test]
387 async fn test_output_indices() {
388 let window_slide = Interval::from_minutes(15);
389 let window_size = Interval::from_minutes(30);
390 let window_offset = Interval::from_minutes(0);
391 let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);
392
393 let mut stream = executor.execute();
394 let chunk = stream.next().await.unwrap().unwrap();
397 assert_eq!(
398 chunk,
399 DataChunk::from_pretty(
400 &" I TS TS TS
401 1 ^09:45:00 ^10:15:00 ^10:00:00
402 3 ^09:45:00 ^10:15:00 ^10:05:00
403 2 ^09:45:00 ^10:15:00 ^10:14:00
404 1 ^10:00:00 ^10:30:00 ^10:22:00
405 3 ^10:15:00 ^10:45:00 ^10:33:00
406 2 ^10:15:00 ^10:45:00 ^10:42:00
407 1 ^10:30:00 ^11:00:00 ^10:51:00
408 3 ^10:45:00 ^11:15:00 ^11:02:00"
409 .replace('^', "2022-02-02T"),
410 )
411 );
412
413 let chunk = stream.next().await.unwrap().unwrap();
414 assert_eq!(
415 chunk,
416 DataChunk::from_pretty(
417 &"I TS TS TS
418 1 ^10:00:00 ^10:30:00 ^10:00:00
419 3 ^10:00:00 ^10:30:00 ^10:05:00
420 2 ^10:00:00 ^10:30:00 ^10:14:00
421 1 ^10:15:00 ^10:45:00 ^10:22:00
422 3 ^10:30:00 ^11:00:00 ^10:33:00
423 2 ^10:30:00 ^11:00:00 ^10:42:00
424 1 ^10:45:00 ^11:15:00 ^10:51:00
425 3 ^11:00:00 ^11:30:00 ^11:02:00"
426 .replace('^', "2022-02-02T"),
427 )
428 );
429 }
430}