risingwave_stream/executor/
hop_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::types::Interval;
20use risingwave_expr::ExprError;
21use risingwave_expr::expr::NonStrictExpression;
22
23use crate::executor::prelude::*;
24
25pub struct HopWindowExecutor {
26    _ctx: ActorContextRef,
27    pub input: Executor,
28    pub time_col_idx: usize,
29    pub window_slide: Interval,
30    pub window_size: Interval,
31    window_start_exprs: Vec<NonStrictExpression>,
32    window_end_exprs: Vec<NonStrictExpression>,
33    pub output_indices: Vec<usize>,
34    chunk_size: usize,
35}
36
37impl HopWindowExecutor {
38    #[allow(clippy::too_many_arguments)]
39    pub fn new(
40        ctx: ActorContextRef,
41        input: Executor,
42        time_col_idx: usize,
43        window_slide: Interval,
44        window_size: Interval,
45        window_start_exprs: Vec<NonStrictExpression>,
46        window_end_exprs: Vec<NonStrictExpression>,
47        output_indices: Vec<usize>,
48        chunk_size: usize,
49    ) -> Self {
50        HopWindowExecutor {
51            _ctx: ctx,
52            input,
53            time_col_idx,
54            window_slide,
55            window_size,
56            window_start_exprs,
57            window_end_exprs,
58            output_indices,
59            chunk_size,
60        }
61    }
62}
63
64impl Execute for HopWindowExecutor {
65    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
66        self.execute_inner().boxed()
67    }
68}
69
70impl HopWindowExecutor {
71    #[try_stream(ok = Message, error = StreamExecutorError)]
72    async fn execute_inner(self: Box<Self>) {
73        let Self {
74            input,
75
76            window_slide,
77            window_size,
78            output_indices,
79            time_col_idx,
80
81            chunk_size,
82            ..
83        } = *self;
84        let units = window_size
85            .exact_div(&window_slide)
86            .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
87            .ok_or_else(|| ExprError::InvalidParam {
88                name: "window",
89                reason: format!(
90                    "window_size {} cannot be divided by window_slide {}",
91                    window_size, window_slide
92                )
93                .into(),
94            })?
95            .get();
96
97        // The following indices are the output indices as if the downstream needs all input + hop
98        // window columns.
99        let logical_window_start_col_idx = input.schema().len();
100        let logical_window_end_col_idx = input.schema().len() + 1;
101
102        // The following indices are the real output column indices. `None` means we don't need to
103        // output that column.
104        let out_window_start_col_idx = output_indices
105            .iter()
106            .position(|&idx| idx == logical_window_start_col_idx);
107        let out_window_end_col_idx = output_indices
108            .iter()
109            .position(|&idx| idx == logical_window_end_col_idx);
110
111        #[for_await]
112        for msg in input.execute() {
113            let msg = msg?;
114            match msg {
115                Message::Chunk(chunk) => {
116                    if units == 0 {
117                        continue;
118                    }
119
120                    // TODO: compact may be not necessary here.
121                    let chunk = chunk.compact();
122                    let (data_chunk, ops) = chunk.into_parts();
123                    // SAFETY: Already compacted.
124                    assert!(data_chunk.is_compacted());
125                    let len = data_chunk.cardinality();
126
127                    // Collect each window's data into a chunk.
128                    let mut chunks = Vec::with_capacity(units);
129
130                    for i in 0..units {
131                        let window_start_col = if out_window_start_col_idx.is_some() {
132                            Some(
133                                self.window_start_exprs[i]
134                                    .eval_infallible(&data_chunk)
135                                    .await,
136                            )
137                        } else {
138                            None
139                        };
140                        let window_end_col = if out_window_end_col_idx.is_some() {
141                            Some(self.window_end_exprs[i].eval_infallible(&data_chunk).await)
142                        } else {
143                            None
144                        };
145                        let new_cols = output_indices
146                            .iter()
147                            .filter_map(|&idx| {
148                                if idx < logical_window_start_col_idx {
149                                    Some(data_chunk.column_at(idx).clone())
150                                } else if idx == logical_window_start_col_idx {
151                                    Some(window_start_col.clone().unwrap())
152                                } else if idx == logical_window_end_col_idx {
153                                    Some(window_end_col.clone().unwrap())
154                                } else {
155                                    None
156                                }
157                            })
158                            .collect();
159
160                        chunks.push(DataChunk::new(new_cols, len));
161                    }
162
163                    // Reorganize the output rows from the same input row together.
164                    let mut row_iters = chunks.iter().map(|c| c.rows()).collect_vec();
165
166                    let data_types = chunks[0].data_types();
167                    let mut chunk_builder = StreamChunkBuilder::new(chunk_size, data_types);
168
169                    for &op in &*ops {
170                        // Since there could be multiple rows for the same input row, we need to
171                        // transform the `U-`/`U+` into `-`/`+` and then duplicate it.
172                        let op = match op {
173                            Op::Insert | Op::UpdateInsert => Op::Insert,
174                            Op::Delete | Op::UpdateDelete => Op::Delete,
175                        };
176                        for row_iter in &mut row_iters {
177                            if let Some(chunk) =
178                                chunk_builder.append_row(op, row_iter.next().unwrap())
179                            {
180                                yield Message::Chunk(chunk);
181                            }
182                        }
183                    }
184
185                    if let Some(chunk) = chunk_builder.take() {
186                        yield Message::Chunk(chunk);
187                    }
188
189                    // Rows should be exhausted.
190                    debug_assert!(row_iters.into_iter().all(|mut it| it.next().is_none()));
191                }
192                Message::Barrier(b) => {
193                    yield Message::Barrier(b);
194                }
195                Message::Watermark(w) => {
196                    if w.col_idx == time_col_idx {
197                        if let (Some(out_start_idx), Some(start_expr)) =
198                            (out_window_start_col_idx, self.window_start_exprs.first())
199                        {
200                            let w = w
201                                .clone()
202                                .transform_with_expr(start_expr, out_start_idx)
203                                .await;
204                            if let Some(w) = w {
205                                yield Message::Watermark(w);
206                            }
207                        }
208                        if let (Some(out_end_idx), Some(end_expr)) =
209                            (out_window_end_col_idx, self.window_end_exprs.first())
210                        {
211                            let w = w.transform_with_expr(end_expr, out_end_idx).await;
212                            if let Some(w) = w {
213                                yield Message::Watermark(w);
214                            }
215                        }
216                    } else if let Some(out_idx) =
217                        output_indices.iter().position(|&idx| idx == w.col_idx)
218                    {
219                        yield Message::Watermark(w.with_idx(out_idx));
220                    }
221                }
222            };
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
230    use risingwave_common::catalog::Field;
231    use risingwave_common::types::test_utils::IntervalTestExt;
232    use risingwave_expr::expr::test_utils::make_hop_window_expression;
233
234    use super::*;
235    use crate::executor::test_utils::MockSource;
236
237    const CHUNK_SIZE: usize = 256;
238
239    fn create_executor(output_indices: Vec<usize>) -> Box<dyn Execute> {
240        let field1 = Field::unnamed(DataType::Int64);
241        let field2 = Field::unnamed(DataType::Int64);
242        let field3 = Field::with_name(DataType::Timestamp, "created_at");
243        let schema = Schema::new(vec![field1, field2, field3]);
244        let pk_indices = vec![0];
245
246        let chunk = StreamChunk::from_pretty(
247            &"I I TS
248            + 1 1 ^10:00:00
249            + 2 3 ^10:05:00
250            - 3 2 ^10:14:00
251            + 4 1 ^10:22:00
252           U- 5 2 ^10:33:00
253           U+ 6 2 ^10:42:00
254            - 7 1 ^10:51:00
255            + 8 3 ^11:02:00"
256                .replace('^', "2022-02-02T"),
257        );
258        let input =
259            MockSource::with_chunks(vec![chunk]).into_executor(schema.clone(), pk_indices.clone());
260        let window_slide = Interval::from_minutes(15);
261        let window_size = Interval::from_minutes(30);
262        let window_offset = Interval::from_minutes(0);
263        let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
264            DataType::Timestamp,
265            2,
266            window_size,
267            window_slide,
268            window_offset,
269        )
270        .unwrap();
271
272        HopWindowExecutor::new(
273            ActorContext::for_test(123),
274            input,
275            2,
276            window_slide,
277            window_size,
278            window_start_exprs
279                .into_iter()
280                .map(NonStrictExpression::for_test)
281                .collect(),
282            window_end_exprs
283                .into_iter()
284                .map(NonStrictExpression::for_test)
285                .collect(),
286            output_indices,
287            CHUNK_SIZE,
288        )
289        .boxed()
290    }
291
292    #[tokio::test]
293    async fn test_execute() {
294        let default_indices: Vec<_> = (0..5).collect();
295        let executor = create_executor(default_indices);
296
297        let mut stream = executor.execute();
298        // TODO: add more test infra to reduce the duplicated codes below.
299
300        let chunk = stream.next().await.unwrap().unwrap().into_chunk().unwrap();
301        assert_eq!(
302            chunk,
303            StreamChunk::from_pretty(
304                &"I I TS        TS        TS
305                + 1 1 ^10:00:00 ^09:45:00 ^10:15:00
306                + 1 1 ^10:00:00 ^10:00:00 ^10:30:00
307                + 2 3 ^10:05:00 ^09:45:00 ^10:15:00
308                + 2 3 ^10:05:00 ^10:00:00 ^10:30:00
309                - 3 2 ^10:14:00 ^09:45:00 ^10:15:00
310                - 3 2 ^10:14:00 ^10:00:00 ^10:30:00
311                + 4 1 ^10:22:00 ^10:00:00 ^10:30:00
312                + 4 1 ^10:22:00 ^10:15:00 ^10:45:00
313                - 5 2 ^10:33:00 ^10:15:00 ^10:45:00
314                - 5 2 ^10:33:00 ^10:30:00 ^11:00:00
315                + 6 2 ^10:42:00 ^10:15:00 ^10:45:00
316                + 6 2 ^10:42:00 ^10:30:00 ^11:00:00
317                - 7 1 ^10:51:00 ^10:30:00 ^11:00:00
318                - 7 1 ^10:51:00 ^10:45:00 ^11:15:00
319                + 8 3 ^11:02:00 ^10:45:00 ^11:15:00
320                + 8 3 ^11:02:00 ^11:00:00 ^11:30:00"
321                    .replace('^', "2022-02-02T"),
322            )
323        );
324    }
325
326    #[tokio::test]
327    async fn test_output_indices() {
328        let executor = create_executor(vec![4, 1, 0, 2]);
329
330        let mut stream = executor.execute();
331        // TODO: add more test infra to reduce the duplicated codes below.
332
333        let chunk = stream.next().await.unwrap().unwrap().into_chunk().unwrap();
334        assert_eq!(
335            chunk,
336            StreamChunk::from_pretty(
337                &"TS        I I TS
338                + ^10:15:00 1 1 ^10:00:00
339                + ^10:30:00 1 1 ^10:00:00
340                + ^10:15:00 3 2 ^10:05:00
341                + ^10:30:00 3 2 ^10:05:00
342                - ^10:15:00 2 3 ^10:14:00
343                - ^10:30:00 2 3 ^10:14:00
344                + ^10:30:00 1 4 ^10:22:00
345                + ^10:45:00 1 4 ^10:22:00
346                - ^10:45:00 2 5 ^10:33:00
347                - ^11:00:00 2 5 ^10:33:00
348                + ^10:45:00 2 6 ^10:42:00
349                + ^11:00:00 2 6 ^10:42:00
350                - ^11:00:00 1 7 ^10:51:00
351                - ^11:15:00 1 7 ^10:51:00
352                + ^11:15:00 3 8 ^11:02:00
353                + ^11:30:00 3 8 ^11:02:00"
354                    .replace('^', "2022-02-02T"),
355            )
356        );
357    }
358}