1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19 FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32 input: Executor,
33 inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37 pub actor_ctx: ActorContextRef,
38
39 pub input: Executor,
40
41 pub schema: Schema,
42 pub buffer_table: StateTable<S>,
43 pub prev_row_table: StateTable<S>,
44 pub chunk_size: usize,
45 pub time_column_index: usize,
46 pub fill_columns: HashMap<usize, FillStrategy>,
47 pub gap_interval: NonStrictExpression,
48}
49
50pub struct GapFillMetrics {
51 pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
52}
53
54struct ExecutorInner<S: StateStore> {
55 actor_ctx: ActorContextRef,
56
57 schema: Schema,
58 buffer_table: StateTable<S>,
59 prev_row_table: StateTable<S>,
60 chunk_size: usize,
61 time_column_index: usize,
62 fill_columns: HashMap<usize, FillStrategy>,
63 gap_interval: NonStrictExpression,
64
65 metrics: GapFillMetrics,
67}
68
69struct ExecutionVars<S: StateStore> {
70 buffer: SortBuffer<S>,
71}
72
73impl<S: StateStore> ExecutorInner<S> {
74 fn generate_filled_rows(
75 prev_row: &OwnedRow,
76 curr_row: &OwnedRow,
77 time_column_index: usize,
78 fill_columns: &HashMap<usize, FillStrategy>,
79 interval: risingwave_common::types::Interval,
80 metrics: &GapFillMetrics,
81 ) -> Result<Vec<OwnedRow>, ExprError> {
82 let mut filled_rows = Vec::new();
83 let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
84 prev_row.datum_at(time_column_index),
85 curr_row.datum_at(time_column_index),
86 ) else {
87 return Ok(filled_rows);
88 };
89
90 let prev_time = match prev_time_scalar {
91 ScalarRefImpl::Timestamp(ts) => ts,
92 ScalarRefImpl::Timestamptz(ts) => {
93 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
94 Ok(timestamp) => timestamp,
95 Err(_) => {
96 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
97 return Ok(filled_rows);
98 }
99 }
100 }
101 _ => {
102 warn!(
103 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
104 prev_time_scalar
105 );
106 return Ok(filled_rows);
107 }
108 };
109
110 let curr_time = match curr_time_scalar {
111 ScalarRefImpl::Timestamp(ts) => ts,
112 ScalarRefImpl::Timestamptz(ts) => {
113 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
114 Ok(timestamp) => timestamp,
115 Err(_) => {
116 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
117 return Ok(filled_rows);
118 }
119 }
120 }
121 _ => {
122 warn!(
123 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
124 curr_time_scalar
125 );
126 return Ok(filled_rows);
127 }
128 };
129 if prev_time >= curr_time {
130 return Ok(filled_rows);
131 }
132
133 let mut fill_time = match prev_time.checked_add(interval) {
134 Some(t) => t,
135 None => {
136 return Ok(filled_rows);
137 }
138 };
139 if fill_time >= curr_time {
140 return Ok(filled_rows);
141 }
142
143 let mut row_count = 0;
145 let mut temp_time = fill_time;
146 while temp_time < curr_time {
147 row_count += 1;
148 temp_time = match temp_time.checked_add(interval) {
149 Some(t) => t,
150 None => break,
151 };
152 }
153
154 let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
156 let mut interpolation_states: Vec<Datum> = Vec::new();
157
158 for i in 0..prev_row.len() {
159 if let Some(strategy) = fill_columns.get(&i) {
160 if matches!(strategy, FillStrategy::Interpolate) {
161 let step = calculate_interpolation_step(
162 prev_row.datum_at(i),
163 curr_row.datum_at(i),
164 row_count + 1,
165 );
166 interpolation_steps.push(step.clone());
167 interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
168 } else {
169 interpolation_steps.push(None);
170 interpolation_states.push(None);
171 }
172 } else {
173 interpolation_steps.push(None);
174 interpolation_states.push(None);
175 }
176 }
177
178 while fill_time < curr_time {
180 let mut new_row_data = Vec::with_capacity(prev_row.len());
181
182 for col_idx in 0..prev_row.len() {
183 let datum = if col_idx == time_column_index {
184 let fill_time_scalar = match prev_time_scalar {
186 ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
187 ScalarRefImpl::Timestamptz(_) => {
188 let micros = fill_time.0.and_utc().timestamp_micros();
189 ScalarImpl::Timestamptz(
190 risingwave_common::types::Timestamptz::from_micros(micros),
191 )
192 }
193 _ => unreachable!("Time column should be Timestamp or Timestamptz"),
194 };
195 Some(fill_time_scalar)
196 } else if let Some(strategy) = fill_columns.get(&col_idx) {
197 match strategy {
199 FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
200 FillStrategy::Null => None,
201 FillStrategy::Interpolate => {
202 if let Some(step) = &interpolation_steps[col_idx] {
204 apply_interpolation_step(&mut interpolation_states[col_idx], step);
205 interpolation_states[col_idx].clone()
206 } else {
207 None
209 }
210 }
211 }
212 } else {
213 None
215 };
216 new_row_data.push(datum);
217 }
218
219 filled_rows.push(OwnedRow::new(new_row_data));
220
221 fill_time = match fill_time.checked_add(interval) {
222 Some(t) => t,
223 None => {
224 warn!(
226 "Gap fill stopped due to timestamp overflow after generating {} rows.",
227 filled_rows.len()
228 );
229 break;
230 }
231 };
232 }
233
234 metrics
236 .gap_fill_generated_rows_count
237 .inc_by(filled_rows.len() as u64);
238
239 Ok(filled_rows)
240 }
241}
242
243impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
244 fn execute(self: Box<Self>) -> BoxedMessageStream {
245 self.execute_inner().boxed()
246 }
247}
248
249impl<S: StateStore> EowcGapFillExecutor<S> {
250 pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
251 let metrics = args.actor_ctx.streaming_metrics.clone();
252 let actor_id = args.actor_ctx.id.to_string();
253 let fragment_id = args.actor_ctx.fragment_id.to_string();
254 let gap_fill_metrics = GapFillMetrics {
255 gap_fill_generated_rows_count: metrics
256 .gap_fill_generated_rows_count
257 .with_guarded_label_values(&[&actor_id, &fragment_id]),
258 };
259
260 Self {
261 input: args.input,
262
263 inner: ExecutorInner {
264 actor_ctx: args.actor_ctx,
265 schema: args.schema,
266 buffer_table: args.buffer_table,
267 prev_row_table: args.prev_row_table,
268 chunk_size: args.chunk_size,
269 time_column_index: args.time_column_index,
270 fill_columns: args.fill_columns,
271 gap_interval: args.gap_interval,
272 metrics: gap_fill_metrics,
273 },
274 }
275 }
276
277 #[try_stream(ok = Message, error = StreamExecutorError)]
278 async fn execute_inner(self) {
279 let Self {
280 input,
281 inner: mut this,
282 } = self;
283
284 let mut input = input.execute();
285
286 let barrier = expect_first_barrier(&mut input).await?;
287 let first_epoch = barrier.epoch;
288 yield Message::Barrier(barrier);
289 this.buffer_table.init_epoch(first_epoch).await?;
290 this.prev_row_table.init_epoch(first_epoch).await?;
291
292 let dummy_row = OwnedRow::new(vec![]);
294 let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
295 let interval = interval_datum
296 .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
297 .into_interval();
298
299 if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
301 Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
302 }
303
304 let mut vars = ExecutionVars {
305 buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
306 };
307 let mut committed_prev_row: Option<OwnedRow> =
308 this.prev_row_table.get_from_one_row_table().await?;
309 let mut staging_prev_row = committed_prev_row.clone();
310
311 vars.buffer.refill_cache(None, &this.buffer_table).await?;
312
313 #[for_await]
314 for msg in input {
315 match msg? {
316 Message::Watermark(watermark @ Watermark { col_idx, .. })
317 if col_idx == this.time_column_index =>
318 {
319 let mut chunk_builder =
320 StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
321
322 #[for_await]
323 for row in vars
324 .buffer
325 .consume(watermark.val.clone(), &mut this.buffer_table)
326 {
327 let current_row = row?;
328 if let Some(p_row) = &staging_prev_row {
329 let filled_rows = ExecutorInner::<S>::generate_filled_rows(
330 p_row,
331 ¤t_row,
332 this.time_column_index,
333 &this.fill_columns,
334 interval,
335 &this.metrics,
336 )?;
337 for filled_row in filled_rows {
338 if let Some(chunk) =
339 chunk_builder.append_row(Op::Insert, &filled_row)
340 {
341 yield Message::Chunk(chunk);
342 }
343 }
344 }
345 if let Some(chunk) = chunk_builder.append_row(Op::Insert, ¤t_row) {
346 yield Message::Chunk(chunk);
347 }
348 staging_prev_row = Some(current_row);
349 }
350 if let Some(chunk) = chunk_builder.take() {
351 yield Message::Chunk(chunk);
352 }
353
354 yield Message::Watermark(watermark);
355 }
356 Message::Watermark(_) => continue,
357 Message::Chunk(chunk) => {
358 vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
359 this.buffer_table.try_flush().await?;
360 }
361 Message::Barrier(barrier) => {
362 if committed_prev_row != staging_prev_row {
363 if let Some(old_row) = &committed_prev_row {
364 this.prev_row_table.delete(old_row);
365 }
366 if let Some(new_row) = &staging_prev_row {
367 this.prev_row_table.insert(new_row);
368 }
369 }
370
371 let post_commit = this.buffer_table.commit(barrier.epoch).await?;
372 this.prev_row_table
373 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
374 .await?;
375
376 committed_prev_row.clone_from(&staging_prev_row);
377
378 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
379 yield Message::Barrier(barrier);
380
381 if post_commit
382 .post_yield_barrier(update_vnode_bitmap)
383 .await?
384 .is_some()
385 {
386 vars.buffer.refill_cache(None, &this.buffer_table).await?;
391 }
392 }
393 }
394 }
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
401 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
402 use risingwave_common::types::Interval;
403 use risingwave_common::types::test_utils::IntervalTestExt;
404 use risingwave_common::util::epoch::test_epoch;
405 use risingwave_common::util::sort_util::OrderType;
406 use risingwave_expr::expr::LiteralExpression;
407 use risingwave_storage::memory::MemoryStateStore;
408
409 use super::*;
410 use crate::common::table::test_utils::gen_pbtable_with_dist_key;
411 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
412
413 async fn create_executor<S: StateStore>(
414 time_column_index: usize,
415 fill_columns: HashMap<usize, FillStrategy>,
416 gap_interval: NonStrictExpression,
417 store: S,
418 ) -> (MessageSender, BoxedMessageStream) {
419 let input_schema = Schema::new(vec![
420 Field::unnamed(DataType::Timestamp),
421 Field::unnamed(DataType::Int32),
422 Field::unnamed(DataType::Int64),
423 Field::unnamed(DataType::Float32),
424 Field::unnamed(DataType::Float64),
425 ]);
426 let input_stream_key = vec![time_column_index];
427
428 let table_columns = vec![
429 ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
430 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
431 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
432 ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
433 ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
434 ];
435
436 let table_pk_indices = vec![time_column_index];
437 let table_order_types = vec![OrderType::ascending()];
438 let buffer_table = StateTable::from_table_catalog(
439 &gen_pbtable_with_dist_key(
440 TableId::new(0),
441 table_columns.clone(),
442 table_order_types,
443 table_pk_indices,
444 0,
445 vec![],
446 ),
447 store.clone(),
448 None,
449 )
450 .await;
451
452 let prev_row_pk_indices = vec![0];
453 let prev_row_order_types = vec![OrderType::ascending()];
454 let prev_row_table = StateTable::from_table_catalog(
455 &gen_pbtable_with_dist_key(
456 TableId::new(1),
457 table_columns,
458 prev_row_order_types,
459 prev_row_pk_indices,
460 0,
461 vec![],
462 ),
463 store,
464 None,
465 )
466 .await;
467
468 let (tx, source) = MockSource::channel();
469 let source = source.into_executor(input_schema, input_stream_key);
470 let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
471 actor_ctx: ActorContext::for_test(123),
472 schema: source.schema().clone(),
473 input: source,
474 buffer_table,
475 prev_row_table,
476 chunk_size: 1024,
477 time_column_index,
478 fill_columns,
479 gap_interval,
480 });
481
482 (tx, gap_fill_executor.boxed().execute())
483 }
484
485 #[tokio::test]
486 async fn test_gap_fill_interpolate() {
487 let time_column_index = 0;
488 let gap_interval = Interval::from_days(1);
489 let fill_columns = HashMap::from([
490 (1, FillStrategy::Interpolate),
491 (2, FillStrategy::Interpolate),
492 (3, FillStrategy::Interpolate),
493 (4, FillStrategy::Interpolate),
494 ]);
495 let store = MemoryStateStore::new();
496 let (mut tx, mut gap_fill_executor) = create_executor(
497 time_column_index,
498 fill_columns,
499 NonStrictExpression::for_test(LiteralExpression::new(
500 DataType::Interval,
501 Some(gap_interval.into()),
502 )),
503 store.clone(),
504 )
505 .await;
506
507 tx.push_barrier(test_epoch(1), false);
508 gap_fill_executor.expect_barrier().await;
509
510 tx.push_int64_watermark(1, 0_i64);
511 tx.push_watermark(
512 0,
513 DataType::Timestamp,
514 "2023-03-06 18:27:03"
515 .parse::<risingwave_common::types::Timestamp>()
516 .unwrap()
517 .into(),
518 );
519 gap_fill_executor.expect_watermark().await;
520
521 tx.push_chunk(StreamChunk::from_pretty(
522 " TS i I f F
523 + 2023-04-01T10:00:00 10 100 1.0 100.0
524 + 2023-04-05T10:00:00 50 200 5.0 200.0",
525 ));
526
527 tx.push_int64_watermark(1, 0_i64);
528 tx.push_watermark(
529 0,
530 DataType::Timestamp,
531 "2023-04-05 18:27:03"
532 .parse::<risingwave_common::types::Timestamp>()
533 .unwrap()
534 .into(),
535 );
536
537 let chunk = gap_fill_executor.expect_chunk().await;
538 assert_eq!(
539 chunk,
540 StreamChunk::from_pretty(
541 " TS i I f F
542 + 2023-04-01T10:00:00 10 100 1.0 100.0
543 + 2023-04-02T10:00:00 20 125 2.0 125.0
544 + 2023-04-03T10:00:00 30 150 3.0 150.0
545 + 2023-04-04T10:00:00 40 175 4.0 175.0
546 + 2023-04-05T10:00:00 50 200 5.0 200.0",
547 )
548 );
549 gap_fill_executor.expect_watermark().await;
550 }
551
552 #[tokio::test]
553 async fn test_gap_fill_locf() {
554 let time_column_index = 0;
555 let gap_interval = Interval::from_days(1);
556 let fill_columns = HashMap::from([
557 (1, FillStrategy::Locf),
558 (2, FillStrategy::Locf),
559 (3, FillStrategy::Locf),
560 (4, FillStrategy::Locf),
561 ]);
562 let store = MemoryStateStore::new();
563 let (mut tx, mut gap_fill_executor) = create_executor(
564 time_column_index,
565 fill_columns,
566 NonStrictExpression::for_test(LiteralExpression::new(
567 DataType::Interval,
568 Some(gap_interval.into()),
569 )),
570 store.clone(),
571 )
572 .await;
573
574 tx.push_barrier(test_epoch(1), false);
575 gap_fill_executor.expect_barrier().await;
576
577 tx.push_int64_watermark(1, 0_i64);
578 tx.push_watermark(
579 0,
580 DataType::Timestamp,
581 "2023-03-06 18:27:03"
582 .parse::<risingwave_common::types::Timestamp>()
583 .unwrap()
584 .into(),
585 );
586 gap_fill_executor.expect_watermark().await;
587
588 tx.push_chunk(StreamChunk::from_pretty(
589 " TS i I f F
590 + 2023-04-01T10:00:00 10 100 1.0 100.0
591 + 2023-04-05T10:00:00 50 200 5.0 200.0",
592 ));
593
594 tx.push_int64_watermark(1, 0_i64);
595 tx.push_watermark(
596 0,
597 DataType::Timestamp,
598 "2023-04-05 18:27:03"
599 .parse::<risingwave_common::types::Timestamp>()
600 .unwrap()
601 .into(),
602 );
603
604 let chunk = gap_fill_executor.expect_chunk().await;
605 assert_eq!(
606 chunk,
607 StreamChunk::from_pretty(
608 " TS i I f F
609 + 2023-04-01T10:00:00 10 100 1.0 100.0
610 + 2023-04-02T10:00:00 10 100 1.0 100.0
611 + 2023-04-03T10:00:00 10 100 1.0 100.0
612 + 2023-04-04T10:00:00 10 100 1.0 100.0
613 + 2023-04-05T10:00:00 50 200 5.0 200.0",
614 )
615 );
616 gap_fill_executor.expect_watermark().await;
617 }
618
619 #[tokio::test]
620 async fn test_gap_fill_null() {
621 let time_column_index = 0;
622 let gap_interval = Interval::from_days(1);
623 let fill_columns = HashMap::from([
624 (1, FillStrategy::Null),
625 (2, FillStrategy::Null),
626 (3, FillStrategy::Null),
627 (4, FillStrategy::Null),
628 ]);
629 let store = MemoryStateStore::new();
630 let (mut tx, mut gap_fill_executor) = create_executor(
631 time_column_index,
632 fill_columns,
633 NonStrictExpression::for_test(LiteralExpression::new(
634 DataType::Interval,
635 Some(gap_interval.into()),
636 )),
637 store.clone(),
638 )
639 .await;
640
641 tx.push_barrier(test_epoch(1), false);
642 gap_fill_executor.expect_barrier().await;
643
644 tx.push_int64_watermark(1, 0_i64);
645 tx.push_watermark(
646 0,
647 DataType::Timestamp,
648 "2023-03-06 18:27:03"
649 .parse::<risingwave_common::types::Timestamp>()
650 .unwrap()
651 .into(),
652 );
653 gap_fill_executor.expect_watermark().await;
654
655 tx.push_chunk(StreamChunk::from_pretty(
656 " TS i I f F
657 + 2023-04-01T10:00:00 10 100 1.0 100.0
658 + 2023-04-05T10:00:00 50 200 5.0 200.0",
659 ));
660
661 tx.push_int64_watermark(1, 0_i64);
662 tx.push_watermark(
663 0,
664 DataType::Timestamp,
665 "2023-04-05 18:27:03"
666 .parse::<risingwave_common::types::Timestamp>()
667 .unwrap()
668 .into(),
669 );
670
671 let chunk = gap_fill_executor.expect_chunk().await;
672 assert_eq!(
673 chunk,
674 StreamChunk::from_pretty(
675 " TS i I f F
676 + 2023-04-01T10:00:00 10 100 1.0 100.0
677 + 2023-04-02T10:00:00 . . . .
678 + 2023-04-03T10:00:00 . . . .
679 + 2023-04-04T10:00:00 . . . .
680 + 2023-04-05T10:00:00 50 200 5.0 200.0",
681 )
682 );
683 gap_fill_executor.expect_watermark().await;
684 }
685
686 #[tokio::test]
687 async fn test_gap_fill_mixed_strategy() {
688 let time_column_index = 0;
689 let gap_interval = Interval::from_days(1);
690 let fill_columns = HashMap::from([
691 (1, FillStrategy::Interpolate),
692 (2, FillStrategy::Locf),
693 (3, FillStrategy::Null),
694 (4, FillStrategy::Interpolate),
695 ]);
696 let store = MemoryStateStore::new();
697 let (mut tx, mut gap_fill_executor) = create_executor(
698 time_column_index,
699 fill_columns,
700 NonStrictExpression::for_test(LiteralExpression::new(
701 DataType::Interval,
702 Some(gap_interval.into()),
703 )),
704 store.clone(),
705 )
706 .await;
707
708 tx.push_barrier(test_epoch(1), false);
709 gap_fill_executor.expect_barrier().await;
710
711 tx.push_int64_watermark(1, 0_i64);
712 tx.push_watermark(
713 0,
714 DataType::Timestamp,
715 "2023-03-06 18:27:03"
716 .parse::<risingwave_common::types::Timestamp>()
717 .unwrap()
718 .into(),
719 );
720 gap_fill_executor.expect_watermark().await;
721
722 tx.push_chunk(StreamChunk::from_pretty(
723 " TS i I f F
724 + 2023-04-01T10:00:00 10 100 1.0 100.0
725 + 2023-04-05T10:00:00 50 200 5.0 200.0",
726 ));
727
728 tx.push_int64_watermark(1, 0_i64);
729 tx.push_watermark(
730 0,
731 DataType::Timestamp,
732 "2023-04-05 18:27:03"
733 .parse::<risingwave_common::types::Timestamp>()
734 .unwrap()
735 .into(),
736 );
737
738 let chunk = gap_fill_executor.expect_chunk().await;
739 assert_eq!(
740 chunk,
741 StreamChunk::from_pretty(
742 " TS i I f F
743 + 2023-04-01T10:00:00 10 100 1.0 100.0
744 + 2023-04-02T10:00:00 20 100 . 125.0
745 + 2023-04-03T10:00:00 30 100 . 150.0
746 + 2023-04-04T10:00:00 40 100 . 175.0
747 + 2023-04-05T10:00:00 50 200 5.0 200.0",
748 )
749 );
750 gap_fill_executor.expect_watermark().await;
751 }
752
753 #[tokio::test]
754 async fn test_gap_fill_fail_over() {
755 let time_column_index = 0;
756 let gap_interval = Interval::from_days(1);
757 let fill_columns = HashMap::from([
758 (1, FillStrategy::Locf),
759 (2, FillStrategy::Interpolate),
760 (3, FillStrategy::Locf),
761 (4, FillStrategy::Locf),
762 ]);
763 let store = MemoryStateStore::new();
764 let (mut tx, mut gap_fill_executor) = create_executor(
765 time_column_index,
766 fill_columns.clone(),
767 NonStrictExpression::for_test(LiteralExpression::new(
768 DataType::Interval,
769 Some(gap_interval.into()),
770 )),
771 store.clone(),
772 )
773 .await;
774
775 tx.push_barrier(test_epoch(1), false);
776 gap_fill_executor.expect_barrier().await;
777
778 tx.push_chunk(StreamChunk::from_pretty(
779 " TS i I f F
780 + 2023-04-01T10:00:00 10 100 1.0 100.0
781 + 2023-04-05T10:00:00 50 200 5.0 200.0",
782 ));
783
784 tx.push_barrier(test_epoch(2), false);
785 gap_fill_executor.expect_barrier().await;
786
787 let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
788 time_column_index,
789 fill_columns.clone(),
790 NonStrictExpression::for_test(LiteralExpression::new(
791 DataType::Interval,
792 Some(gap_interval.into()),
793 )),
794 store.clone(),
795 )
796 .await;
797
798 recovered_tx.push_barrier(test_epoch(2), false);
799 recovered_gap_fill_executor.expect_barrier().await;
800
801 recovered_tx.push_watermark(
802 0,
803 DataType::Timestamp,
804 "2023-04-06T10:00:00"
805 .parse::<risingwave_common::types::Timestamp>()
806 .unwrap()
807 .into(),
808 );
809
810 let chunk = recovered_gap_fill_executor.expect_chunk().await;
811 assert_eq!(
812 chunk,
813 StreamChunk::from_pretty(
814 " TS i I f F
815 + 2023-04-01T10:00:00 10 100 1.0 100.0
816 + 2023-04-02T10:00:00 10 125 1.0 100.0
817 + 2023-04-03T10:00:00 10 150 1.0 100.0
818 + 2023-04-04T10:00:00 10 175 1.0 100.0
819 + 2023-04-05T10:00:00 50 200 5.0 200.0"
820 )
821 );
822
823 recovered_gap_fill_executor.expect_watermark().await;
824
825 recovered_tx.push_chunk(StreamChunk::from_pretty(
826 " TS i I f F
827 + 2023-04-08T10:00:00 80 500 8.0 500.0",
828 ));
829
830 recovered_tx.push_barrier(test_epoch(3), false);
831 recovered_gap_fill_executor.expect_barrier().await;
832
833 let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
834 time_column_index,
835 fill_columns,
836 NonStrictExpression::for_test(LiteralExpression::new(
837 DataType::Interval,
838 Some(gap_interval.into()),
839 )),
840 store,
841 )
842 .await;
843
844 final_recovered_tx.push_barrier(test_epoch(3), false);
845 final_recovered_gap_fill_executor.expect_barrier().await;
846
847 final_recovered_tx.push_watermark(
848 0,
849 DataType::Timestamp,
850 "2023-04-09T10:00:00"
851 .parse::<risingwave_common::types::Timestamp>()
852 .unwrap()
853 .into(),
854 );
855
856 let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
857 assert_eq!(
858 chunk,
859 StreamChunk::from_pretty(
860 " TS i I f F
861 + 2023-04-06T10:00:00 50 300 5.0 200.0
862 + 2023-04-07T10:00:00 50 400 5.0 200.0
863 + 2023-04-08T10:00:00 80 500 8.0 500.0"
864 )
865 );
866
867 final_recovered_gap_fill_executor.expect_watermark().await;
868 }
869}