risingwave_stream/executor/
hop_window.rs1use std::num::NonZeroUsize;
16
17use itertools::Itertools;
18use risingwave_common::array::{DataChunk, Op};
19use risingwave_common::types::Interval;
20use risingwave_expr::ExprError;
21use risingwave_expr::expr::NonStrictExpression;
22
23use crate::executor::prelude::*;
24
25pub struct HopWindowExecutor {
26 _ctx: ActorContextRef,
27 pub input: Executor,
28 pub time_col_idx: usize,
29 pub window_slide: Interval,
30 pub window_size: Interval,
31 window_start_exprs: Vec<NonStrictExpression>,
32 window_end_exprs: Vec<NonStrictExpression>,
33 pub output_indices: Vec<usize>,
34 chunk_size: usize,
35}
36
37impl HopWindowExecutor {
38 #[allow(clippy::too_many_arguments)]
39 pub fn new(
40 ctx: ActorContextRef,
41 input: Executor,
42 time_col_idx: usize,
43 window_slide: Interval,
44 window_size: Interval,
45 window_start_exprs: Vec<NonStrictExpression>,
46 window_end_exprs: Vec<NonStrictExpression>,
47 output_indices: Vec<usize>,
48 chunk_size: usize,
49 ) -> Self {
50 HopWindowExecutor {
51 _ctx: ctx,
52 input,
53 time_col_idx,
54 window_slide,
55 window_size,
56 window_start_exprs,
57 window_end_exprs,
58 output_indices,
59 chunk_size,
60 }
61 }
62}
63
64impl Execute for HopWindowExecutor {
65 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
66 self.execute_inner().boxed()
67 }
68}
69
70impl HopWindowExecutor {
71 #[try_stream(ok = Message, error = StreamExecutorError)]
72 async fn execute_inner(self: Box<Self>) {
73 let Self {
74 input,
75
76 window_slide,
77 window_size,
78 output_indices,
79 time_col_idx,
80
81 chunk_size,
82 ..
83 } = *self;
84 let units = window_size
85 .exact_div(&window_slide)
86 .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
87 .ok_or_else(|| ExprError::InvalidParam {
88 name: "window",
89 reason: format!(
90 "window_size {} cannot be divided by window_slide {}",
91 window_size, window_slide
92 )
93 .into(),
94 })?
95 .get();
96
97 let logical_window_start_col_idx = input.schema().len();
100 let logical_window_end_col_idx = input.schema().len() + 1;
101
102 let out_window_start_col_idx = output_indices
105 .iter()
106 .position(|&idx| idx == logical_window_start_col_idx);
107 let out_window_end_col_idx = output_indices
108 .iter()
109 .position(|&idx| idx == logical_window_end_col_idx);
110
111 #[for_await]
112 for msg in input.execute() {
113 let msg = msg?;
114 match msg {
115 Message::Chunk(chunk) => {
116 if units == 0 {
117 continue;
118 }
119
120 let chunk = chunk.compact();
122 let (data_chunk, ops) = chunk.into_parts();
123 assert!(data_chunk.is_compacted());
125 let len = data_chunk.cardinality();
126
127 let mut chunks = Vec::with_capacity(units);
129
130 for i in 0..units {
131 let window_start_col = if out_window_start_col_idx.is_some() {
132 Some(
133 self.window_start_exprs[i]
134 .eval_infallible(&data_chunk)
135 .await,
136 )
137 } else {
138 None
139 };
140 let window_end_col = if out_window_end_col_idx.is_some() {
141 Some(self.window_end_exprs[i].eval_infallible(&data_chunk).await)
142 } else {
143 None
144 };
145 let new_cols = output_indices
146 .iter()
147 .filter_map(|&idx| {
148 if idx < logical_window_start_col_idx {
149 Some(data_chunk.column_at(idx).clone())
150 } else if idx == logical_window_start_col_idx {
151 Some(window_start_col.clone().unwrap())
152 } else if idx == logical_window_end_col_idx {
153 Some(window_end_col.clone().unwrap())
154 } else {
155 None
156 }
157 })
158 .collect();
159
160 chunks.push(DataChunk::new(new_cols, len));
161 }
162
163 let mut row_iters = chunks.iter().map(|c| c.rows()).collect_vec();
165
166 let data_types = chunks[0].data_types();
167 let mut chunk_builder = StreamChunkBuilder::new(chunk_size, data_types);
168
169 for &op in &*ops {
170 let op = match op {
173 Op::Insert | Op::UpdateInsert => Op::Insert,
174 Op::Delete | Op::UpdateDelete => Op::Delete,
175 };
176 for row_iter in &mut row_iters {
177 if let Some(chunk) =
178 chunk_builder.append_row(op, row_iter.next().unwrap())
179 {
180 yield Message::Chunk(chunk);
181 }
182 }
183 }
184
185 if let Some(chunk) = chunk_builder.take() {
186 yield Message::Chunk(chunk);
187 }
188
189 debug_assert!(row_iters.into_iter().all(|mut it| it.next().is_none()));
191 }
192 Message::Barrier(b) => {
193 yield Message::Barrier(b);
194 }
195 Message::Watermark(w) => {
196 if w.col_idx == time_col_idx {
197 if let (Some(out_start_idx), Some(start_expr)) =
198 (out_window_start_col_idx, self.window_start_exprs.first())
199 {
200 let w = w
201 .clone()
202 .transform_with_expr(start_expr, out_start_idx)
203 .await;
204 if let Some(w) = w {
205 yield Message::Watermark(w);
206 }
207 }
208 if let (Some(out_end_idx), Some(end_expr)) =
209 (out_window_end_col_idx, self.window_end_exprs.first())
210 {
211 let w = w.transform_with_expr(end_expr, out_end_idx).await;
212 if let Some(w) = w {
213 yield Message::Watermark(w);
214 }
215 }
216 } else if let Some(out_idx) =
217 output_indices.iter().position(|&idx| idx == w.col_idx)
218 {
219 yield Message::Watermark(w.with_idx(out_idx));
220 }
221 }
222 };
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
230 use risingwave_common::catalog::Field;
231 use risingwave_common::types::test_utils::IntervalTestExt;
232 use risingwave_expr::expr::test_utils::make_hop_window_expression;
233
234 use super::*;
235 use crate::executor::test_utils::MockSource;
236
237 const CHUNK_SIZE: usize = 256;
238
239 fn create_executor(output_indices: Vec<usize>) -> Box<dyn Execute> {
240 let field1 = Field::unnamed(DataType::Int64);
241 let field2 = Field::unnamed(DataType::Int64);
242 let field3 = Field::with_name(DataType::Timestamp, "created_at");
243 let schema = Schema::new(vec![field1, field2, field3]);
244 let pk_indices = vec![0];
245
246 let chunk = StreamChunk::from_pretty(
247 &"I I TS
248 + 1 1 ^10:00:00
249 + 2 3 ^10:05:00
250 - 3 2 ^10:14:00
251 + 4 1 ^10:22:00
252 U- 5 2 ^10:33:00
253 U+ 6 2 ^10:42:00
254 - 7 1 ^10:51:00
255 + 8 3 ^11:02:00"
256 .replace('^', "2022-02-02T"),
257 );
258 let input =
259 MockSource::with_chunks(vec![chunk]).into_executor(schema.clone(), pk_indices.clone());
260 let window_slide = Interval::from_minutes(15);
261 let window_size = Interval::from_minutes(30);
262 let window_offset = Interval::from_minutes(0);
263 let (window_start_exprs, window_end_exprs) = make_hop_window_expression(
264 DataType::Timestamp,
265 2,
266 window_size,
267 window_slide,
268 window_offset,
269 )
270 .unwrap();
271
272 HopWindowExecutor::new(
273 ActorContext::for_test(123),
274 input,
275 2,
276 window_slide,
277 window_size,
278 window_start_exprs
279 .into_iter()
280 .map(NonStrictExpression::for_test)
281 .collect(),
282 window_end_exprs
283 .into_iter()
284 .map(NonStrictExpression::for_test)
285 .collect(),
286 output_indices,
287 CHUNK_SIZE,
288 )
289 .boxed()
290 }
291
292 #[tokio::test]
293 async fn test_execute() {
294 let default_indices: Vec<_> = (0..5).collect();
295 let executor = create_executor(default_indices);
296
297 let mut stream = executor.execute();
298 let chunk = stream.next().await.unwrap().unwrap().into_chunk().unwrap();
301 assert_eq!(
302 chunk,
303 StreamChunk::from_pretty(
304 &"I I TS TS TS
305 + 1 1 ^10:00:00 ^09:45:00 ^10:15:00
306 + 1 1 ^10:00:00 ^10:00:00 ^10:30:00
307 + 2 3 ^10:05:00 ^09:45:00 ^10:15:00
308 + 2 3 ^10:05:00 ^10:00:00 ^10:30:00
309 - 3 2 ^10:14:00 ^09:45:00 ^10:15:00
310 - 3 2 ^10:14:00 ^10:00:00 ^10:30:00
311 + 4 1 ^10:22:00 ^10:00:00 ^10:30:00
312 + 4 1 ^10:22:00 ^10:15:00 ^10:45:00
313 - 5 2 ^10:33:00 ^10:15:00 ^10:45:00
314 - 5 2 ^10:33:00 ^10:30:00 ^11:00:00
315 + 6 2 ^10:42:00 ^10:15:00 ^10:45:00
316 + 6 2 ^10:42:00 ^10:30:00 ^11:00:00
317 - 7 1 ^10:51:00 ^10:30:00 ^11:00:00
318 - 7 1 ^10:51:00 ^10:45:00 ^11:15:00
319 + 8 3 ^11:02:00 ^10:45:00 ^11:15:00
320 + 8 3 ^11:02:00 ^11:00:00 ^11:30:00"
321 .replace('^', "2022-02-02T"),
322 )
323 );
324 }
325
326 #[tokio::test]
327 async fn test_output_indices() {
328 let executor = create_executor(vec![4, 1, 0, 2]);
329
330 let mut stream = executor.execute();
331 let chunk = stream.next().await.unwrap().unwrap().into_chunk().unwrap();
334 assert_eq!(
335 chunk,
336 StreamChunk::from_pretty(
337 &"TS I I TS
338 + ^10:15:00 1 1 ^10:00:00
339 + ^10:30:00 1 1 ^10:00:00
340 + ^10:15:00 3 2 ^10:05:00
341 + ^10:30:00 3 2 ^10:05:00
342 - ^10:15:00 2 3 ^10:14:00
343 - ^10:30:00 2 3 ^10:14:00
344 + ^10:30:00 1 4 ^10:22:00
345 + ^10:45:00 1 4 ^10:22:00
346 - ^10:45:00 2 5 ^10:33:00
347 - ^11:00:00 2 5 ^10:33:00
348 + ^10:45:00 2 6 ^10:42:00
349 + ^11:00:00 2 6 ^10:42:00
350 - ^11:00:00 1 7 ^10:51:00
351 - ^11:15:00 1 7 ^10:51:00
352 + ^11:15:00 3 8 ^11:02:00
353 + ^11:30:00 3 8 ^11:02:00"
354 .replace('^', "2022-02-02T"),
355 )
356 );
357 }
358}