1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19 FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32 input: Executor,
33 inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37 pub actor_ctx: ActorContextRef,
38
39 pub input: Executor,
40
41 pub schema: Schema,
42 pub buffer_table: StateTable<S>,
43 pub prev_row_table: StateTable<S>,
44 pub chunk_size: usize,
45 pub time_column_index: usize,
46 pub fill_columns: HashMap<usize, FillStrategy>,
47 pub gap_interval: NonStrictExpression,
48}
49
50pub struct GapFillMetrics {
51 pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
52}
53
54struct ExecutorInner<S: StateStore> {
55 actor_ctx: ActorContextRef,
56
57 schema: Schema,
58 buffer_table: StateTable<S>,
59 prev_row_table: StateTable<S>,
60 chunk_size: usize,
61 time_column_index: usize,
62 fill_columns: HashMap<usize, FillStrategy>,
63 gap_interval: NonStrictExpression,
64
65 metrics: GapFillMetrics,
67}
68
69struct ExecutionVars<S: StateStore> {
70 buffer: SortBuffer<S>,
71}
72
73impl<S: StateStore> ExecutorInner<S> {
74 fn generate_filled_rows(
75 prev_row: &OwnedRow,
76 curr_row: &OwnedRow,
77 time_column_index: usize,
78 fill_columns: &HashMap<usize, FillStrategy>,
79 interval: risingwave_common::types::Interval,
80 metrics: &GapFillMetrics,
81 ) -> Result<Vec<OwnedRow>, ExprError> {
82 let mut filled_rows = Vec::new();
83 let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
84 prev_row.datum_at(time_column_index),
85 curr_row.datum_at(time_column_index),
86 ) else {
87 return Ok(filled_rows);
88 };
89
90 let prev_time = match prev_time_scalar {
91 ScalarRefImpl::Timestamp(ts) => ts,
92 ScalarRefImpl::Timestamptz(ts) => {
93 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
94 Ok(timestamp) => timestamp,
95 Err(_) => {
96 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
97 return Ok(filled_rows);
98 }
99 }
100 }
101 _ => {
102 warn!(
103 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
104 prev_time_scalar
105 );
106 return Ok(filled_rows);
107 }
108 };
109
110 let curr_time = match curr_time_scalar {
111 ScalarRefImpl::Timestamp(ts) => ts,
112 ScalarRefImpl::Timestamptz(ts) => {
113 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
114 Ok(timestamp) => timestamp,
115 Err(_) => {
116 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
117 return Ok(filled_rows);
118 }
119 }
120 }
121 _ => {
122 warn!(
123 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
124 curr_time_scalar
125 );
126 return Ok(filled_rows);
127 }
128 };
129 if prev_time >= curr_time {
130 return Ok(filled_rows);
131 }
132
133 let mut fill_time = match prev_time.checked_add(interval) {
134 Some(t) => t,
135 None => {
136 return Ok(filled_rows);
137 }
138 };
139 if fill_time >= curr_time {
140 return Ok(filled_rows);
141 }
142
143 let mut row_count = 0;
145 let mut temp_time = fill_time;
146 while temp_time < curr_time {
147 row_count += 1;
148 temp_time = match temp_time.checked_add(interval) {
149 Some(t) => t,
150 None => break,
151 };
152 }
153
154 let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
156 let mut interpolation_states: Vec<Datum> = Vec::new();
157
158 for i in 0..prev_row.len() {
159 if let Some(strategy) = fill_columns.get(&i) {
160 if matches!(strategy, FillStrategy::Interpolate) {
161 let step = calculate_interpolation_step(
162 prev_row.datum_at(i),
163 curr_row.datum_at(i),
164 row_count + 1,
165 );
166 interpolation_steps.push(step.clone());
167 interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
168 } else {
169 interpolation_steps.push(None);
170 interpolation_states.push(None);
171 }
172 } else {
173 interpolation_steps.push(None);
174 interpolation_states.push(None);
175 }
176 }
177
178 while fill_time < curr_time {
180 let mut new_row_data = Vec::with_capacity(prev_row.len());
181
182 for col_idx in 0..prev_row.len() {
183 let datum = if col_idx == time_column_index {
184 let fill_time_scalar = match prev_time_scalar {
186 ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
187 ScalarRefImpl::Timestamptz(_) => {
188 let micros = fill_time.0.and_utc().timestamp_micros();
189 ScalarImpl::Timestamptz(
190 risingwave_common::types::Timestamptz::from_micros(micros),
191 )
192 }
193 _ => unreachable!("Time column should be Timestamp or Timestamptz"),
194 };
195 Some(fill_time_scalar)
196 } else if let Some(strategy) = fill_columns.get(&col_idx) {
197 match strategy {
199 FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
200 FillStrategy::Null => None,
201 FillStrategy::Interpolate => {
202 if let Some(step) = &interpolation_steps[col_idx] {
204 apply_interpolation_step(&mut interpolation_states[col_idx], step);
205 interpolation_states[col_idx].clone()
206 } else {
207 None
209 }
210 }
211 }
212 } else {
213 None
215 };
216 new_row_data.push(datum);
217 }
218
219 filled_rows.push(OwnedRow::new(new_row_data));
220
221 fill_time = match fill_time.checked_add(interval) {
222 Some(t) => t,
223 None => {
224 warn!(
226 "Gap fill stopped due to timestamp overflow after generating {} rows.",
227 filled_rows.len()
228 );
229 break;
230 }
231 };
232 }
233
234 metrics
236 .gap_fill_generated_rows_count
237 .inc_by(filled_rows.len() as u64);
238
239 Ok(filled_rows)
240 }
241}
242
243impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
244 fn execute(self: Box<Self>) -> BoxedMessageStream {
245 self.execute_inner().boxed()
246 }
247}
248
249impl<S: StateStore> EowcGapFillExecutor<S> {
250 pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
251 let metrics = args.actor_ctx.streaming_metrics.clone();
252 let actor_id = args.actor_ctx.id.to_string();
253 let fragment_id = args.actor_ctx.fragment_id.to_string();
254 let gap_fill_metrics = GapFillMetrics {
255 gap_fill_generated_rows_count: metrics
256 .gap_fill_generated_rows_count
257 .with_guarded_label_values(&[&actor_id, &fragment_id]),
258 };
259
260 Self {
261 input: args.input,
262
263 inner: ExecutorInner {
264 actor_ctx: args.actor_ctx,
265 schema: args.schema,
266 buffer_table: args.buffer_table,
267 prev_row_table: args.prev_row_table,
268 chunk_size: args.chunk_size,
269 time_column_index: args.time_column_index,
270 fill_columns: args.fill_columns,
271 gap_interval: args.gap_interval,
272 metrics: gap_fill_metrics,
273 },
274 }
275 }
276
277 #[try_stream(ok = Message, error = StreamExecutorError)]
278 async fn execute_inner(self) {
279 let Self {
280 input,
281 inner: mut this,
282 } = self;
283
284 let mut input = input.execute();
285
286 let barrier = expect_first_barrier(&mut input).await?;
287 let first_epoch = barrier.epoch;
288 yield Message::Barrier(barrier);
289 this.buffer_table.init_epoch(first_epoch).await?;
290 this.prev_row_table.init_epoch(first_epoch).await?;
291
292 let dummy_row = OwnedRow::new(vec![]);
294 let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
295 let interval = interval_datum
296 .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
297 .into_interval();
298
299 if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
301 Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
302 }
303
304 let mut vars = ExecutionVars {
305 buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
306 };
307 let mut committed_prev_row: Option<OwnedRow> =
308 this.prev_row_table.get_from_one_row_table().await?;
309 let mut staging_prev_row = committed_prev_row.clone();
310
311 vars.buffer.refill_cache(None, &this.buffer_table).await?;
312
313 #[for_await]
314 for msg in input {
315 match msg? {
316 Message::Watermark(watermark @ Watermark { col_idx, .. })
317 if col_idx == this.time_column_index =>
318 {
319 let mut chunk_builder =
320 StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
321
322 #[for_await]
323 for row in vars
324 .buffer
325 .consume(watermark.val.clone(), &mut this.buffer_table)
326 {
327 let current_row = row?;
328 if let Some(p_row) = &staging_prev_row {
329 let filled_rows = ExecutorInner::<S>::generate_filled_rows(
330 p_row,
331 ¤t_row,
332 this.time_column_index,
333 &this.fill_columns,
334 interval,
335 &this.metrics,
336 )?;
337 for filled_row in filled_rows {
338 if let Some(chunk) =
339 chunk_builder.append_row(Op::Insert, &filled_row)
340 {
341 yield Message::Chunk(chunk);
342 }
343 }
344 }
345 if let Some(chunk) = chunk_builder.append_row(Op::Insert, ¤t_row) {
346 yield Message::Chunk(chunk);
347 }
348 staging_prev_row = Some(current_row);
349 }
350 if let Some(chunk) = chunk_builder.take() {
351 yield Message::Chunk(chunk);
352 }
353
354 yield Message::Watermark(watermark);
355 }
356 Message::Watermark(_) => continue,
357 Message::Chunk(chunk) => {
358 vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
359 this.buffer_table.try_flush().await?;
360 }
361 Message::Barrier(barrier) => {
362 if committed_prev_row != staging_prev_row {
363 if let Some(old_row) = &committed_prev_row {
364 this.prev_row_table.delete(old_row);
365 }
366 if let Some(new_row) = &staging_prev_row {
367 this.prev_row_table.insert(new_row);
368 }
369 }
370
371 let post_commit = this.buffer_table.commit(barrier.epoch).await?;
372 this.prev_row_table
373 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
374 .await?;
375
376 committed_prev_row.clone_from(&staging_prev_row);
377
378 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
379 yield Message::Barrier(barrier);
380
381 if let Some((_, cache_may_stale)) =
382 post_commit.post_yield_barrier(update_vnode_bitmap).await?
383 && cache_may_stale
384 {
385 vars.buffer.refill_cache(None, &this.buffer_table).await?;
386 }
387 }
388 }
389 }
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
396 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
397 use risingwave_common::types::Interval;
398 use risingwave_common::types::test_utils::IntervalTestExt;
399 use risingwave_common::util::epoch::test_epoch;
400 use risingwave_common::util::sort_util::OrderType;
401 use risingwave_expr::expr::LiteralExpression;
402 use risingwave_storage::memory::MemoryStateStore;
403
404 use super::*;
405 use crate::common::table::test_utils::gen_pbtable_with_dist_key;
406 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
407
408 async fn create_executor<S: StateStore>(
409 time_column_index: usize,
410 fill_columns: HashMap<usize, FillStrategy>,
411 gap_interval: NonStrictExpression,
412 store: S,
413 ) -> (MessageSender, BoxedMessageStream) {
414 let input_schema = Schema::new(vec![
415 Field::unnamed(DataType::Timestamp),
416 Field::unnamed(DataType::Int32),
417 Field::unnamed(DataType::Int64),
418 Field::unnamed(DataType::Float32),
419 Field::unnamed(DataType::Float64),
420 ]);
421 let input_stream_key = vec![time_column_index];
422
423 let table_columns = vec![
424 ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
425 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
426 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
427 ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
428 ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
429 ];
430
431 let table_pk_indices = vec![time_column_index];
432 let table_order_types = vec![OrderType::ascending()];
433 let buffer_table = StateTable::from_table_catalog(
434 &gen_pbtable_with_dist_key(
435 TableId::new(0),
436 table_columns.clone(),
437 table_order_types,
438 table_pk_indices,
439 0,
440 vec![],
441 ),
442 store.clone(),
443 None,
444 )
445 .await;
446
447 let prev_row_pk_indices = vec![0];
448 let prev_row_order_types = vec![OrderType::ascending()];
449 let prev_row_table = StateTable::from_table_catalog(
450 &gen_pbtable_with_dist_key(
451 TableId::new(1),
452 table_columns,
453 prev_row_order_types,
454 prev_row_pk_indices,
455 0,
456 vec![],
457 ),
458 store,
459 None,
460 )
461 .await;
462
463 let (tx, source) = MockSource::channel();
464 let source = source.into_executor(input_schema, input_stream_key);
465 let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
466 actor_ctx: ActorContext::for_test(123),
467 schema: source.schema().clone(),
468 input: source,
469 buffer_table,
470 prev_row_table,
471 chunk_size: 1024,
472 time_column_index,
473 fill_columns,
474 gap_interval,
475 });
476
477 (tx, gap_fill_executor.boxed().execute())
478 }
479
480 #[tokio::test]
481 async fn test_gap_fill_interpolate() {
482 let time_column_index = 0;
483 let gap_interval = Interval::from_days(1);
484 let fill_columns = HashMap::from([
485 (1, FillStrategy::Interpolate),
486 (2, FillStrategy::Interpolate),
487 (3, FillStrategy::Interpolate),
488 (4, FillStrategy::Interpolate),
489 ]);
490 let store = MemoryStateStore::new();
491 let (mut tx, mut gap_fill_executor) = create_executor(
492 time_column_index,
493 fill_columns,
494 NonStrictExpression::for_test(LiteralExpression::new(
495 DataType::Interval,
496 Some(gap_interval.into()),
497 )),
498 store.clone(),
499 )
500 .await;
501
502 tx.push_barrier(test_epoch(1), false);
503 gap_fill_executor.expect_barrier().await;
504
505 tx.push_int64_watermark(1, 0_i64);
506 tx.push_watermark(
507 0,
508 DataType::Timestamp,
509 "2023-03-06 18:27:03"
510 .parse::<risingwave_common::types::Timestamp>()
511 .unwrap()
512 .into(),
513 );
514 gap_fill_executor.expect_watermark().await;
515
516 tx.push_chunk(StreamChunk::from_pretty(
517 " TS i I f F
518 + 2023-04-01T10:00:00 10 100 1.0 100.0
519 + 2023-04-05T10:00:00 50 200 5.0 200.0",
520 ));
521
522 tx.push_int64_watermark(1, 0_i64);
523 tx.push_watermark(
524 0,
525 DataType::Timestamp,
526 "2023-04-05 18:27:03"
527 .parse::<risingwave_common::types::Timestamp>()
528 .unwrap()
529 .into(),
530 );
531
532 let chunk = gap_fill_executor.expect_chunk().await;
533 assert_eq!(
534 chunk,
535 StreamChunk::from_pretty(
536 " TS i I f F
537 + 2023-04-01T10:00:00 10 100 1.0 100.0
538 + 2023-04-02T10:00:00 20 125 2.0 125.0
539 + 2023-04-03T10:00:00 30 150 3.0 150.0
540 + 2023-04-04T10:00:00 40 175 4.0 175.0
541 + 2023-04-05T10:00:00 50 200 5.0 200.0",
542 )
543 );
544 gap_fill_executor.expect_watermark().await;
545 }
546
547 #[tokio::test]
548 async fn test_gap_fill_locf() {
549 let time_column_index = 0;
550 let gap_interval = Interval::from_days(1);
551 let fill_columns = HashMap::from([
552 (1, FillStrategy::Locf),
553 (2, FillStrategy::Locf),
554 (3, FillStrategy::Locf),
555 (4, FillStrategy::Locf),
556 ]);
557 let store = MemoryStateStore::new();
558 let (mut tx, mut gap_fill_executor) = create_executor(
559 time_column_index,
560 fill_columns,
561 NonStrictExpression::for_test(LiteralExpression::new(
562 DataType::Interval,
563 Some(gap_interval.into()),
564 )),
565 store.clone(),
566 )
567 .await;
568
569 tx.push_barrier(test_epoch(1), false);
570 gap_fill_executor.expect_barrier().await;
571
572 tx.push_int64_watermark(1, 0_i64);
573 tx.push_watermark(
574 0,
575 DataType::Timestamp,
576 "2023-03-06 18:27:03"
577 .parse::<risingwave_common::types::Timestamp>()
578 .unwrap()
579 .into(),
580 );
581 gap_fill_executor.expect_watermark().await;
582
583 tx.push_chunk(StreamChunk::from_pretty(
584 " TS i I f F
585 + 2023-04-01T10:00:00 10 100 1.0 100.0
586 + 2023-04-05T10:00:00 50 200 5.0 200.0",
587 ));
588
589 tx.push_int64_watermark(1, 0_i64);
590 tx.push_watermark(
591 0,
592 DataType::Timestamp,
593 "2023-04-05 18:27:03"
594 .parse::<risingwave_common::types::Timestamp>()
595 .unwrap()
596 .into(),
597 );
598
599 let chunk = gap_fill_executor.expect_chunk().await;
600 assert_eq!(
601 chunk,
602 StreamChunk::from_pretty(
603 " TS i I f F
604 + 2023-04-01T10:00:00 10 100 1.0 100.0
605 + 2023-04-02T10:00:00 10 100 1.0 100.0
606 + 2023-04-03T10:00:00 10 100 1.0 100.0
607 + 2023-04-04T10:00:00 10 100 1.0 100.0
608 + 2023-04-05T10:00:00 50 200 5.0 200.0",
609 )
610 );
611 gap_fill_executor.expect_watermark().await;
612 }
613
614 #[tokio::test]
615 async fn test_gap_fill_null() {
616 let time_column_index = 0;
617 let gap_interval = Interval::from_days(1);
618 let fill_columns = HashMap::from([
619 (1, FillStrategy::Null),
620 (2, FillStrategy::Null),
621 (3, FillStrategy::Null),
622 (4, FillStrategy::Null),
623 ]);
624 let store = MemoryStateStore::new();
625 let (mut tx, mut gap_fill_executor) = create_executor(
626 time_column_index,
627 fill_columns,
628 NonStrictExpression::for_test(LiteralExpression::new(
629 DataType::Interval,
630 Some(gap_interval.into()),
631 )),
632 store.clone(),
633 )
634 .await;
635
636 tx.push_barrier(test_epoch(1), false);
637 gap_fill_executor.expect_barrier().await;
638
639 tx.push_int64_watermark(1, 0_i64);
640 tx.push_watermark(
641 0,
642 DataType::Timestamp,
643 "2023-03-06 18:27:03"
644 .parse::<risingwave_common::types::Timestamp>()
645 .unwrap()
646 .into(),
647 );
648 gap_fill_executor.expect_watermark().await;
649
650 tx.push_chunk(StreamChunk::from_pretty(
651 " TS i I f F
652 + 2023-04-01T10:00:00 10 100 1.0 100.0
653 + 2023-04-05T10:00:00 50 200 5.0 200.0",
654 ));
655
656 tx.push_int64_watermark(1, 0_i64);
657 tx.push_watermark(
658 0,
659 DataType::Timestamp,
660 "2023-04-05 18:27:03"
661 .parse::<risingwave_common::types::Timestamp>()
662 .unwrap()
663 .into(),
664 );
665
666 let chunk = gap_fill_executor.expect_chunk().await;
667 assert_eq!(
668 chunk,
669 StreamChunk::from_pretty(
670 " TS i I f F
671 + 2023-04-01T10:00:00 10 100 1.0 100.0
672 + 2023-04-02T10:00:00 . . . .
673 + 2023-04-03T10:00:00 . . . .
674 + 2023-04-04T10:00:00 . . . .
675 + 2023-04-05T10:00:00 50 200 5.0 200.0",
676 )
677 );
678 gap_fill_executor.expect_watermark().await;
679 }
680
681 #[tokio::test]
682 async fn test_gap_fill_mixed_strategy() {
683 let time_column_index = 0;
684 let gap_interval = Interval::from_days(1);
685 let fill_columns = HashMap::from([
686 (1, FillStrategy::Interpolate),
687 (2, FillStrategy::Locf),
688 (3, FillStrategy::Null),
689 (4, FillStrategy::Interpolate),
690 ]);
691 let store = MemoryStateStore::new();
692 let (mut tx, mut gap_fill_executor) = create_executor(
693 time_column_index,
694 fill_columns,
695 NonStrictExpression::for_test(LiteralExpression::new(
696 DataType::Interval,
697 Some(gap_interval.into()),
698 )),
699 store.clone(),
700 )
701 .await;
702
703 tx.push_barrier(test_epoch(1), false);
704 gap_fill_executor.expect_barrier().await;
705
706 tx.push_int64_watermark(1, 0_i64);
707 tx.push_watermark(
708 0,
709 DataType::Timestamp,
710 "2023-03-06 18:27:03"
711 .parse::<risingwave_common::types::Timestamp>()
712 .unwrap()
713 .into(),
714 );
715 gap_fill_executor.expect_watermark().await;
716
717 tx.push_chunk(StreamChunk::from_pretty(
718 " TS i I f F
719 + 2023-04-01T10:00:00 10 100 1.0 100.0
720 + 2023-04-05T10:00:00 50 200 5.0 200.0",
721 ));
722
723 tx.push_int64_watermark(1, 0_i64);
724 tx.push_watermark(
725 0,
726 DataType::Timestamp,
727 "2023-04-05 18:27:03"
728 .parse::<risingwave_common::types::Timestamp>()
729 .unwrap()
730 .into(),
731 );
732
733 let chunk = gap_fill_executor.expect_chunk().await;
734 assert_eq!(
735 chunk,
736 StreamChunk::from_pretty(
737 " TS i I f F
738 + 2023-04-01T10:00:00 10 100 1.0 100.0
739 + 2023-04-02T10:00:00 20 100 . 125.0
740 + 2023-04-03T10:00:00 30 100 . 150.0
741 + 2023-04-04T10:00:00 40 100 . 175.0
742 + 2023-04-05T10:00:00 50 200 5.0 200.0",
743 )
744 );
745 gap_fill_executor.expect_watermark().await;
746 }
747
748 #[tokio::test]
749 async fn test_gap_fill_fail_over() {
750 let time_column_index = 0;
751 let gap_interval = Interval::from_days(1);
752 let fill_columns = HashMap::from([
753 (1, FillStrategy::Locf),
754 (2, FillStrategy::Interpolate),
755 (3, FillStrategy::Locf),
756 (4, FillStrategy::Locf),
757 ]);
758 let store = MemoryStateStore::new();
759 let (mut tx, mut gap_fill_executor) = create_executor(
760 time_column_index,
761 fill_columns.clone(),
762 NonStrictExpression::for_test(LiteralExpression::new(
763 DataType::Interval,
764 Some(gap_interval.into()),
765 )),
766 store.clone(),
767 )
768 .await;
769
770 tx.push_barrier(test_epoch(1), false);
771 gap_fill_executor.expect_barrier().await;
772
773 tx.push_chunk(StreamChunk::from_pretty(
774 " TS i I f F
775 + 2023-04-01T10:00:00 10 100 1.0 100.0
776 + 2023-04-05T10:00:00 50 200 5.0 200.0",
777 ));
778
779 tx.push_barrier(test_epoch(2), false);
780 gap_fill_executor.expect_barrier().await;
781
782 let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
783 time_column_index,
784 fill_columns.clone(),
785 NonStrictExpression::for_test(LiteralExpression::new(
786 DataType::Interval,
787 Some(gap_interval.into()),
788 )),
789 store.clone(),
790 )
791 .await;
792
793 recovered_tx.push_barrier(test_epoch(2), false);
794 recovered_gap_fill_executor.expect_barrier().await;
795
796 recovered_tx.push_watermark(
797 0,
798 DataType::Timestamp,
799 "2023-04-06T10:00:00"
800 .parse::<risingwave_common::types::Timestamp>()
801 .unwrap()
802 .into(),
803 );
804
805 let chunk = recovered_gap_fill_executor.expect_chunk().await;
806 assert_eq!(
807 chunk,
808 StreamChunk::from_pretty(
809 " TS i I f F
810 + 2023-04-01T10:00:00 10 100 1.0 100.0
811 + 2023-04-02T10:00:00 10 125 1.0 100.0
812 + 2023-04-03T10:00:00 10 150 1.0 100.0
813 + 2023-04-04T10:00:00 10 175 1.0 100.0
814 + 2023-04-05T10:00:00 50 200 5.0 200.0"
815 )
816 );
817
818 recovered_gap_fill_executor.expect_watermark().await;
819
820 recovered_tx.push_chunk(StreamChunk::from_pretty(
821 " TS i I f F
822 + 2023-04-08T10:00:00 80 500 8.0 500.0",
823 ));
824
825 recovered_tx.push_barrier(test_epoch(3), false);
826 recovered_gap_fill_executor.expect_barrier().await;
827
828 let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
829 time_column_index,
830 fill_columns,
831 NonStrictExpression::for_test(LiteralExpression::new(
832 DataType::Interval,
833 Some(gap_interval.into()),
834 )),
835 store,
836 )
837 .await;
838
839 final_recovered_tx.push_barrier(test_epoch(3), false);
840 final_recovered_gap_fill_executor.expect_barrier().await;
841
842 final_recovered_tx.push_watermark(
843 0,
844 DataType::Timestamp,
845 "2023-04-09T10:00:00"
846 .parse::<risingwave_common::types::Timestamp>()
847 .unwrap()
848 .into(),
849 );
850
851 let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
852 assert_eq!(
853 chunk,
854 StreamChunk::from_pretty(
855 " TS i I f F
856 + 2023-04-06T10:00:00 50 300 5.0 200.0
857 + 2023-04-07T10:00:00 50 400 5.0 200.0
858 + 2023-04-08T10:00:00 80 500 8.0 500.0"
859 )
860 );
861
862 final_recovered_gap_fill_executor.expect_watermark().await;
863 }
864}