risingwave_batch_executors/executor/
hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, Interval};
22use risingwave_expr::ExprError;
23use risingwave_expr::expr::{BoxedExpression, build_from_prost};
24use risingwave_pb::batch_plan::plan_node::NodeBody;
25
26use crate::error::{BatchError, Result};
27use crate::executor::{
28    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
29};
30
31pub struct HopWindowExecutor {
32    child: BoxedExecutor,
33    identity: String,
34    schema: Schema,
35    window_slide: Interval,
36    window_size: Interval,
37    window_start_exprs: Vec<BoxedExpression>,
38    window_end_exprs: Vec<BoxedExpression>,
39    output_indices: Vec<usize>,
40}
41
42impl BoxedExecutorBuilder for HopWindowExecutor {
43    async fn new_boxed_executor(
44        source: &ExecutorBuilder<'_>,
45        inputs: Vec<BoxedExecutor>,
46    ) -> Result<BoxedExecutor> {
47        let [child]: [_; 1] = inputs.try_into().unwrap();
48
49        let hop_window_node = try_match_expand!(
50            source.plan_node().get_node_body().unwrap(),
51            NodeBody::HopWindow
52        )?;
53        let window_slide = hop_window_node.get_window_slide()?.into();
54        let window_size = hop_window_node.get_window_size()?.into();
55        let output_indices = hop_window_node
56            .get_output_indices()
57            .iter()
58            .cloned()
59            .map(|x| x as usize)
60            .collect_vec();
61
62        let window_start_exprs: Vec<_> = hop_window_node
63            .get_window_start_exprs()
64            .iter()
65            .map(build_from_prost)
66            .try_collect()?;
67        let window_end_exprs: Vec<_> = hop_window_node
68            .get_window_end_exprs()
69            .iter()
70            .map(build_from_prost)
71            .try_collect()?;
72        assert_eq!(window_start_exprs.len(), window_end_exprs.len());
73
74        let time_col = hop_window_node.get_time_col() as usize;
75        let time_col_data_type = child.schema().fields()[time_col].data_type();
76        let output_type = DataType::window_of(&time_col_data_type).unwrap();
77        let original_schema: Schema = child
78            .schema()
79            .clone()
80            .into_fields()
81            .into_iter()
82            .chain([
83                Field::with_name(output_type.clone(), "window_start"),
84                Field::with_name(output_type, "window_end"),
85            ])
86            .collect();
87        let output_indices_schema: Schema = output_indices
88            .iter()
89            .map(|&idx| original_schema[idx].clone())
90            .collect();
91        Ok(Box::new(HopWindowExecutor::new(
92            child,
93            output_indices_schema,
94            window_slide,
95            window_size,
96            source.plan_node().get_identity().clone(),
97            window_start_exprs,
98            window_end_exprs,
99            output_indices,
100        )))
101    }
102}
103
104impl HopWindowExecutor {
105    #[allow(clippy::too_many_arguments)]
106    fn new(
107        child: BoxedExecutor,
108        schema: Schema,
109        window_slide: Interval,
110        window_size: Interval,
111        identity: String,
112        window_start_exprs: Vec<BoxedExpression>,
113        window_end_exprs: Vec<BoxedExpression>,
114        output_indices: Vec<usize>,
115    ) -> Self {
116        Self {
117            child,
118            identity,
119            schema,
120            window_slide,
121            window_size,
122            window_start_exprs,
123            window_end_exprs,
124            output_indices,
125        }
126    }
127}
128
129impl Executor for HopWindowExecutor {
130    fn schema(&self) -> &Schema {
131        &self.schema
132    }
133
134    fn identity(&self) -> &str {
135        &self.identity
136    }
137
138    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
139        self.do_execute()
140    }
141}
142
143impl HopWindowExecutor {
144    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
145    async fn do_execute(self: Box<Self>) {
146        let Self {
147            child,
148            window_slide,
149            window_size,
150            output_indices,
151            ..
152        } = *self;
153        let units = window_size
154            .exact_div(&window_slide)
155            .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
156            .ok_or_else(|| ExprError::InvalidParam {
157                name: "window",
158                reason: format!(
159                    "window_size {} cannot be divided by window_slide {}",
160                    window_size, window_slide
161                )
162                .into(),
163            })?
164            .get();
165
166        let window_start_col_index = child.schema().len();
167        let window_end_col_index = child.schema().len() + 1;
168        #[for_await]
169        for data_chunk in child.execute() {
170            let data_chunk = data_chunk?;
171            assert!(data_chunk.is_compacted());
172            let len = data_chunk.cardinality();
173            for i in 0..units {
174                let window_start_col = if output_indices.contains(&window_start_col_index) {
175                    Some(self.window_start_exprs[i].eval(&data_chunk).await?)
176                } else {
177                    None
178                };
179                let window_end_col = if output_indices.contains(&window_end_col_index) {
180                    Some(self.window_end_exprs[i].eval(&data_chunk).await?)
181                } else {
182                    None
183                };
184                let new_cols = output_indices
185                    .iter()
186                    .filter_map(|&idx| {
187                        if idx < window_start_col_index {
188                            Some(data_chunk.column_at(idx).clone())
189                        } else if idx == window_start_col_index {
190                            Some(window_start_col.clone().unwrap())
191                        } else if idx == window_end_col_index {
192                            Some(window_end_col.clone().unwrap())
193                        } else {
194                            None
195                        }
196                    })
197                    .collect_vec();
198                yield DataChunk::new(new_cols, len);
199            }
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use futures::stream::StreamExt;
207    use risingwave_common::array::DataChunkTestExt;
208    use risingwave_common::types::test_utils::IntervalTestExt;
209    use risingwave_expr::expr::test_utils::make_hop_window_expression;
210
211    use super::*;
212    use crate::executor::test_utils::MockExecutor;
213
214    fn create_executor(
215        output_indices: Vec<usize>,
216        window_slide: Interval,
217        window_size: Interval,
218        window_offset: Interval,
219    ) -> Box<HopWindowExecutor> {
220        let field1 = Field::unnamed(DataType::Int64);
221        let field2 = Field::unnamed(DataType::Int64);
222        let field3 = Field::with_name(DataType::Timestamp, "created_at");
223        let schema = Schema::new(vec![field1, field2, field3]);
224
225        let chunk = DataChunk::from_pretty(
226            &"I I TS
227            1 1 ^10:00:00
228            2 3 ^10:05:00
229            3 2 ^10:14:00
230            4 1 ^10:22:00
231            5 3 ^10:33:00
232            6 2 ^10:42:00
233            7 1 ^10:51:00
234            8 3 ^11:02:00"
235                .replace('^', "2022-02-02T"),
236        );
237        let mut mock_executor = MockExecutor::new(schema.clone());
238        mock_executor.add(chunk);
239
240        let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
241            DataType::Timestamp,
242            2,
243            window_size,
244            window_slide,
245            window_offset,
246        )
247        .unwrap();
248
249        Box::new(HopWindowExecutor::new(
250            Box::new(mock_executor),
251            schema,
252            window_slide,
253            window_size,
254            "test".to_owned(),
255            window_start_exprs,
256            window_end_exprs,
257            output_indices,
258        ))
259    }
260
261    #[tokio::test]
262    async fn test_window_offset() {
263        async fn test_window_offset_helper(window_offset: Interval) -> DataChunk {
264            let default_indices = (0..3 + 2).collect_vec();
265            let window_slide = Interval::from_minutes(15);
266            let window_size = Interval::from_minutes(30);
267            let executor =
268                create_executor(default_indices, window_slide, window_size, window_offset);
269            let mut stream = executor.execute();
270            stream.next().await.unwrap().unwrap()
271        }
272
273        let window_size = 30;
274        for offset in 0..window_size {
275            for coefficient in -5..0 {
276                assert_eq!(
277                    test_window_offset_helper(Interval::from_minutes(
278                        coefficient * window_size + offset
279                    ))
280                    .await,
281                    test_window_offset_helper(Interval::from_minutes(
282                        (coefficient - 1) * window_size + offset
283                    ))
284                    .await
285                );
286            }
287        }
288        for offset in 0..window_size {
289            for coefficient in 0..5 {
290                assert_eq!(
291                    test_window_offset_helper(Interval::from_minutes(
292                        coefficient * window_size + offset
293                    ))
294                    .await,
295                    test_window_offset_helper(Interval::from_minutes(
296                        (coefficient + 1) * window_size + offset
297                    ))
298                    .await
299                );
300            }
301        }
302        for offset in -window_size..window_size {
303            assert_eq!(
304                test_window_offset_helper(Interval::from_minutes(window_size + offset)).await,
305                test_window_offset_helper(Interval::from_minutes(-window_size + offset)).await
306            );
307        }
308
309        assert_eq!(
310            test_window_offset_helper(Interval::from_minutes(-31)).await,
311            DataChunk::from_pretty(
312                &"I I TS        TS        TS
313                1 1 ^10:00:00 ^09:44:00 ^10:14:00
314                2 3 ^10:05:00 ^09:44:00 ^10:14:00
315                3 2 ^10:14:00 ^09:59:00 ^10:29:00
316                4 1 ^10:22:00 ^09:59:00 ^10:29:00
317                5 3 ^10:33:00 ^10:14:00 ^10:44:00
318                6 2 ^10:42:00 ^10:14:00 ^10:44:00
319                7 1 ^10:51:00 ^10:29:00 ^10:59:00
320                8 3 ^11:02:00 ^10:44:00 ^11:14:00"
321                    .replace('^', "2022-02-02T"),
322            )
323        );
324        assert_eq!(
325            test_window_offset_helper(Interval::from_minutes(29)).await,
326            DataChunk::from_pretty(
327                &"I I TS        TS        TS
328                1 1 ^10:00:00 ^09:44:00 ^10:14:00
329                2 3 ^10:05:00 ^09:44:00 ^10:14:00
330                3 2 ^10:14:00 ^09:59:00 ^10:29:00
331                4 1 ^10:22:00 ^09:59:00 ^10:29:00
332                5 3 ^10:33:00 ^10:14:00 ^10:44:00
333                6 2 ^10:42:00 ^10:14:00 ^10:44:00
334                7 1 ^10:51:00 ^10:29:00 ^10:59:00
335                8 3 ^11:02:00 ^10:44:00 ^11:14:00"
336                    .replace('^', "2022-02-02T"),
337            )
338        );
339    }
340
341    #[tokio::test]
342    async fn test_execute() {
343        let default_indices = (0..3 + 2).collect_vec();
344
345        let window_slide = Interval::from_minutes(15);
346        let window_size = Interval::from_minutes(30);
347        let window_offset = Interval::from_minutes(0);
348        let executor = create_executor(default_indices, window_slide, window_size, window_offset);
349
350        let mut stream = executor.execute();
351        // TODO: add more test infra to reduce the duplicated codes below.
352
353        let chunk = stream.next().await.unwrap().unwrap();
354        assert_eq!(
355            chunk,
356            DataChunk::from_pretty(
357                &"I I TS        TS        TS
358                1 1 ^10:00:00 ^09:45:00 ^10:15:00
359                2 3 ^10:05:00 ^09:45:00 ^10:15:00
360                3 2 ^10:14:00 ^09:45:00 ^10:15:00
361                4 1 ^10:22:00 ^10:00:00 ^10:30:00
362                5 3 ^10:33:00 ^10:15:00 ^10:45:00
363                6 2 ^10:42:00 ^10:15:00 ^10:45:00
364                7 1 ^10:51:00 ^10:30:00 ^11:00:00
365                8 3 ^11:02:00 ^10:45:00 ^11:15:00"
366                    .replace('^', "2022-02-02T"),
367            )
368        );
369
370        let chunk = stream.next().await.unwrap().unwrap();
371        assert_eq!(
372            chunk,
373            DataChunk::from_pretty(
374                &"I I TS        TS        TS
375                1 1 ^10:00:00 ^10:00:00 ^10:30:00
376                2 3 ^10:05:00 ^10:00:00 ^10:30:00
377                3 2 ^10:14:00 ^10:00:00 ^10:30:00
378                4 1 ^10:22:00 ^10:15:00 ^10:45:00
379                5 3 ^10:33:00 ^10:30:00 ^11:00:00
380                6 2 ^10:42:00 ^10:30:00 ^11:00:00
381                7 1 ^10:51:00 ^10:45:00 ^11:15:00
382                8 3 ^11:02:00 ^11:00:00 ^11:30:00"
383                    .replace('^', "2022-02-02T"),
384            )
385        );
386    }
387    #[tokio::test]
388    async fn test_output_indices() {
389        let window_slide = Interval::from_minutes(15);
390        let window_size = Interval::from_minutes(30);
391        let window_offset = Interval::from_minutes(0);
392        let executor = create_executor(vec![1, 3, 4, 2], window_slide, window_size, window_offset);
393
394        let mut stream = executor.execute();
395        // TODO: add more test infra to reduce the duplicated codes below.
396
397        let chunk = stream.next().await.unwrap().unwrap();
398        assert_eq!(
399            chunk,
400            DataChunk::from_pretty(
401                &" I TS        TS        TS
402                   1 ^09:45:00 ^10:15:00 ^10:00:00
403                   3 ^09:45:00 ^10:15:00 ^10:05:00
404                   2 ^09:45:00 ^10:15:00 ^10:14:00
405                   1 ^10:00:00 ^10:30:00 ^10:22:00
406                   3 ^10:15:00 ^10:45:00 ^10:33:00
407                   2 ^10:15:00 ^10:45:00 ^10:42:00
408                   1 ^10:30:00 ^11:00:00 ^10:51:00
409                   3 ^10:45:00 ^11:15:00 ^11:02:00"
410                    .replace('^', "2022-02-02T"),
411            )
412        );
413
414        let chunk = stream.next().await.unwrap().unwrap();
415        assert_eq!(
416            chunk,
417            DataChunk::from_pretty(
418                &"I TS        TS        TS
419                  1 ^10:00:00 ^10:30:00 ^10:00:00
420                  3 ^10:00:00 ^10:30:00 ^10:05:00
421                  2 ^10:00:00 ^10:30:00 ^10:14:00
422                  1 ^10:15:00 ^10:45:00 ^10:22:00
423                  3 ^10:30:00 ^11:00:00 ^10:33:00
424                  2 ^10:30:00 ^11:00:00 ^10:42:00
425                  1 ^10:45:00 ^11:15:00 ^10:51:00
426                  3 ^11:00:00 ^11:30:00 ^11:02:00"
427                    .replace('^', "2022-02-02T"),
428            )
429        );
430    }
431}