risingwave_batch_executors/executor/
hop_window.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, Interval};
22use risingwave_expr::ExprError;
23use risingwave_expr::expr::{BoxedExpression, build_from_prost};
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25
26use crate::error::{BatchError, Result};
27use crate::executor::{
28    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
29};
30
31pub struct HopWindowExecutor {
32    child: BoxedExecutor,
33    identity: String,
34    schema: Schema,
35    window_slide: Interval,
36    window_size: Interval,
37    window_start_exprs: Vec<BoxedExpression>,
38    window_end_exprs: Vec<BoxedExpression>,
39    output_indices: Vec<usize>,
40}
41
42impl BoxedExecutorBuilder for HopWindowExecutor {
43    async fn new_boxed_executor(
44        source: &ExecutorBuilder<'_>,
45        inputs: Vec<BoxedExecutor>,
46    ) -> Result<BoxedExecutor> {
47        let [child]: [_; 1] = inputs.try_into().unwrap();
48
49        let hop_window_node = try_match_expand!(
50            source.plan_node().get_node_body().unwrap(),
51            NodeBody::HopWindow
52        )?;
53        let window_slide = hop_window_node.get_window_slide()?.into();
54        let window_size = hop_window_node.get_window_size()?.into();
55        let output_indices = hop_window_node
56            .get_output_indices()
57            .iter()
58            .cloned()
59            .map(|x| x as usize)
60            .collect_vec();
61
62        let window_start_exprs: Vec<_> = hop_window_node
63            .get_window_start_exprs()
64            .iter()
65            .map(build_from_prost)
66            .try_collect()?;
67        let window_end_exprs: Vec<_> = hop_window_node
68            .get_window_end_exprs()
69            .iter()
70            .map(build_from_prost)
71            .try_collect()?;
72        assert_eq!(window_start_exprs.len(), window_end_exprs.len());
73
74        let time_col = hop_window_node.get_time_col() as usize;
75        let time_col_data_type = child.schema().fields()[time_col].data_type();
76        let output_type = DataType::window_of(&time_col_data_type).unwrap();
77        let original_schema: Schema = child
78            .schema()
79            .clone()
80            .into_fields()
81            .into_iter()
82            .chain([
83                Field::with_name(output_type.clone(), "window_start"),
84                Field::with_name(output_type, "window_end"),
85            ])
86            .collect();
87        let output_indices_schema: Schema = output_indices
88            .iter()
89            .map(|&idx| original_schema[idx].clone())
90            .collect();
91        Ok(Box::new(HopWindowExecutor::new(
92            child,
93            output_indices_schema,
94            window_slide,
95            window_size,
96            source.plan_node().get_identity().clone(),
97            window_start_exprs,
98            window_end_exprs,
99            output_indices,
100        )))
101    }
102}
103
104impl HopWindowExecutor {
105    fn new(
106        child: BoxedExecutor,
107        schema: Schema,
108        window_slide: Interval,
109        window_size: Interval,
110        identity: String,
111        window_start_exprs: Vec<BoxedExpression>,
112        window_end_exprs: Vec<BoxedExpression>,
113        output_indices: Vec<usize>,
114    ) -> Self {
115        Self {
116            child,
117            identity,
118            schema,
119            window_slide,
120            window_size,
121            window_start_exprs,
122            window_end_exprs,
123            output_indices,
124        }
125    }
126}
127
128impl Executor for HopWindowExecutor {
129    fn schema(&self) -> &Schema {
130        &self.schema
131    }
132
133    fn identity(&self) -> &str {
134        &self.identity
135    }
136
137    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
138        self.do_execute()
139    }
140}
141
142impl HopWindowExecutor {
143    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
144    async fn do_execute(self: Box<Self>) {
145        let Self {
146            child,
147            window_slide,
148            window_size,
149            output_indices,
150            ..
151        } = *self;
152        let units = window_size
153            .exact_div(&window_slide)
154            .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
155            .ok_or_else(|| ExprError::InvalidParam {
156                name: "window",
157                reason: format!(
158                    "window_size {} cannot be divided by window_slide {}",
159                    window_size, window_slide
160                )
161                .into(),
162            })?
163            .get();
164
165        let window_start_col_index = child.schema().len();
166        let window_end_col_index = child.schema().len() + 1;
167        #[for_await]
168        for data_chunk in child.execute() {
169            let data_chunk = data_chunk?;
170            assert!(data_chunk.is_vis_compacted());
171            let len = data_chunk.cardinality();
172            for i in 0..units {
173                let window_start_col = if output_indices.contains(&window_start_col_index) {
174                    Some(self.window_start_exprs[i].eval(&data_chunk).await?)
175                } else {
176                    None
177                };
178                let window_end_col = if output_indices.contains(&window_end_col_index) {
179                    Some(self.window_end_exprs[i].eval(&data_chunk).await?)
180                } else {
181                    None
182                };
183                let new_cols = output_indices
184                    .iter()
185                    .filter_map(|&idx| {
186                        if idx < window_start_col_index {
187                            Some(data_chunk.column_at(idx).clone())
188                        } else if idx == window_start_col_index {
189                            Some(window_start_col.clone().unwrap())
190                        } else if idx == window_end_col_index {
191                            Some(window_end_col.clone().unwrap())
192                        } else {
193                            None
194                        }
195                    })
196                    .collect_vec();
197                yield DataChunk::new(new_cols, len);
198            }
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use futures::stream::StreamExt;
206    use risingwave_common::array::DataChunkTestExt;
207    use risingwave_common::types::test_utils::IntervalTestExt;
208    use risingwave_expr::expr::test_utils::make_hop_window_expression;
209
210    use super::*;
211    use crate::executor::test_utils::MockExecutor;
212
213    fn create_executor(
214        output_indices: Vec<usize>,
215        window_slide: Interval,
216        window_size: Interval,
217        window_offset: Interval,
218    ) -> Box<HopWindowExecutor> {
219        let field1 = Field::unnamed(DataType::Int64);
220        let field2 = Field::unnamed(DataType::Int64);
221        let field3 = Field::with_name(DataType::Timestamp, "created_at");
222        let schema = Schema::new(vec![field1, field2, field3]);
223
224        let chunk = DataChunk::from_pretty(
225            &"I I TS
226            1 1 ^10:00:00
227            2 3 ^10:05:00
228            3 2 ^10:14:00
229            4 1 ^10:22:00
230            5 3 ^10:33:00
231            6 2 ^10:42:00
232            7 1 ^10:51:00
233            8 3 ^11:02:00"
234                .replace('^', "2022-02-02T"),
235        );
236        let mut mock_executor = MockExecutor::new(schema.clone());
237        mock_executor.add(chunk);
238
239        let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
240            DataType::Timestamp,
241            2,
242            window_size,
243            window_slide,
244            window_offset,
245        )
246        .unwrap();
247
248        Box::new(HopWindowExecutor::new(
249            Box::new(mock_executor),
250            schema,
251            window_slide,
252            window_size,
253            "test".to_owned(),
254            window_start_exprs,
255            window_end_exprs,
256            output_indices,
257        ))
258    }
259
260    #[tokio::test]
261    async fn test_window_offset() {
262        async fn test_window_offset_helper(window_offset: Interval) -> DataChunk {
263            let default_indices = (0..3 + 2).collect_vec();
264            let window_slide = Interval::from_minutes(15);
265            let window_size = Interval::from_minutes(30);
266            let executor =
267                create_executor(default_indices, window_slide, window_size, window_offset);
268            let mut stream = executor.execute();
269            stream.next().await.unwrap().unwrap()
270        }
271
272        let window_size = 30;
273        for offset in 0..window_size {
274            for coefficient in -5..0 {
275                assert_eq!(
276                    test_window_offset_helper(Interval::from_minutes(
277                        coefficient * window_size + offset
278                    ))
279                    .await,
280                    test_window_offset_helper(Interval::from_minutes(
281                        (coefficient - 1) * window_size + offset
282                    ))
283                    .await
284                );
285            }
286        }
287        for offset in 0..window_size {
288            for coefficient in 0..5 {
289                assert_eq!(
290                    test_window_offset_helper(Interval::from_minutes(
291                        coefficient * window_size + offset
292                    ))
293                    .await,
294                    test_window_offset_helper(Interval::from_minutes(
295                        (coefficient + 1) * window_size + offset
296                    ))
297                    .await
298                );
299            }
300        }
301        for offset in -window_size..window_size {
302            assert_eq!(
303                test_window_offset_helper(Interval::from_minutes(window_size + offset)).await,
304                test_window_offset_helper(Interval::from_minutes(-window_size + offset)).await
305            );
306        }
307
308        assert_eq!(
309            test_window_offset_helper(Interval::from_minutes(-31)).await,
310            DataChunk::from_pretty(
311                &"I I TS        TS        TS
312                1 1 ^10:00:00 ^09:44:00 ^10:14:00
313                2 3 ^10:05:00 ^09:44:00 ^10:14:00
314                3 2 ^10:14:00 ^09:59:00 ^10:29:00
315                4 1 ^10:22:00 ^09:59:00 ^10:29:00
316                5 3 ^10:33:00 ^10:14:00 ^10:44:00
317                6 2 ^10:42:00 ^10:14:00 ^10:44:00
318                7 1 ^10:51:00 ^10:29:00 ^10:59:00
319                8 3 ^11:02:00 ^10:44:00 ^11:14:00"
320                    .replace('^', "2022-02-02T"),
321            )
322        );
323        assert_eq!(
324            test_window_offset_helper(Interval::from_minutes(29)).await,
325            DataChunk::from_pretty(
326                &"I I TS        TS        TS
327                1 1 ^10:00:00 ^09:44:00 ^10:14:00
328                2 3 ^10:05:00 ^09:44:00 ^10:14:00
329                3 2 ^10:14:00 ^09:59:00 ^10:29:00
330                4 1 ^10:22:00 ^09:59:00 ^10:29:00
331                5 3 ^10:33:00 ^10:14:00 ^10:44:00
332                6 2 ^10:42:00 ^10:14:00 ^10:44:00
333                7 1 ^10:51:00 ^10:29:00 ^10:59:00
334                8 3 ^11:02:00 ^10:44:00 ^11:14:00"
335                    .replace('^', "2022-02-02T"),
336            )
337        );
338    }
339
340    #[tokio::test]
341    async fn test_execute() {
342        let default_indices = (0..3 + 2).collect_vec();
343
344        let window_slide = Interval::from_minutes(15);
345        let window_size = Interval::from_minutes(30);
346        let window_offset = Interval::from_minutes(0);
347        let executor = create_executor(default_indices, window_slide, window_size, window_offset);
348
349        let mut stream = executor.execute();
350        // TODO: add more test infra to reduce the duplicated codes below.
351
352        let chunk = stream.next().await.unwrap().unwrap();
353        assert_eq!(
354            chunk,
355            DataChunk::from_pretty(
356                &"I I TS        TS        TS
357                1 1 ^10:00:00 ^09:45:00 ^10:15:00
358                2 3 ^10:05:00 ^09:45:00 ^10:15:00
359                3 2 ^10:14:00 ^09:45:00 ^10:15:00
360                4 1 ^10:22:00 ^10:00:00 ^10:30:00
361                5 3 ^10:33:00 ^10:15:00 ^10:45:00
362                6 2 ^10:42:00 ^10:15:00 ^10:45:00
363                7 1 ^10:51:00 ^10:30:00 ^11:00:00
364                8 3 ^11:02:00 ^10:45:00 ^11:15:00"
365                    .replace('^', "2022-02-02T"),
366            )
367        );
368
369        let chunk = stream.next().await.unwrap().unwrap();
370        assert_eq!(
371            chunk,
372            DataChunk::from_pretty(
373                &"I I TS        TS        TS
374                1 1 ^10:00:00 ^10:00:00 ^10:30:00
375                2 3 ^10:05:00 ^10:00:00 ^10:30:00
376                3 2 ^10:14:00 ^10:00:00 ^10:30:00
377                4 1 ^10:22:00 ^10:15:00 ^10:45:00
378                5 3 ^10:33:00 ^10:30:00 ^11:00:00
379                6 2 ^10:42:00 ^10:30:00 ^11:00:00
380                7 1 ^10:51:00 ^10:45:00 ^11:15:00
381                8 3 ^11:02:00 ^11:00:00 ^11:30:00"
382                    .replace('^', "2022-02-02T"),
383            )
384        );
385    }
386    #[tokio::test]
387    async fn test_output_indices() {
388        let window_slide = Interval::from_minutes(15);
389        let window_size = Interval::from_minutes(30);
390        let window_offset = Interval::from_minutes(0);
391        let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);
392
393        let mut stream = executor.execute();
394        // TODO: add more test infra to reduce the duplicated codes below.
395
396        let chunk = stream.next().await.unwrap().unwrap();
397        assert_eq!(
398            chunk,
399            DataChunk::from_pretty(
400                &" I TS        TS        TS
401                   1 ^09:45:00 ^10:15:00 ^10:00:00
402                   3 ^09:45:00 ^10:15:00 ^10:05:00
403                   2 ^09:45:00 ^10:15:00 ^10:14:00
404                   1 ^10:00:00 ^10:30:00 ^10:22:00
405                   3 ^10:15:00 ^10:45:00 ^10:33:00
406                   2 ^10:15:00 ^10:45:00 ^10:42:00
407                   1 ^10:30:00 ^11:00:00 ^10:51:00
408                   3 ^10:45:00 ^11:15:00 ^11:02:00"
409                    .replace('^', "2022-02-02T"),
410            )
411        );
412
413        let chunk = stream.next().await.unwrap().unwrap();
414        assert_eq!(
415            chunk,
416            DataChunk::from_pretty(
417                &"I TS        TS        TS
418                  1 ^10:00:00 ^10:30:00 ^10:00:00
419                  3 ^10:00:00 ^10:30:00 ^10:05:00
420                  2 ^10:00:00 ^10:30:00 ^10:14:00
421                  1 ^10:15:00 ^10:45:00 ^10:22:00
422                  3 ^10:30:00 ^11:00:00 ^10:33:00
423                  2 ^10:30:00 ^11:00:00 ^10:42:00
424                  1 ^10:45:00 ^11:15:00 ^10:51:00
425                  3 ^11:00:00 ^11:30:00 ^11:02:00"
426                    .replace('^', "2022-02-02T"),
427            )
428        );
429    }
430}