1use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19 FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, Interval, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32 input: Executor,
33 inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37 pub actor_ctx: ActorContextRef,
38
39 pub input: Executor,
40
41 pub schema: Schema,
42 pub buffer_table: StateTable<S>,
43 pub prev_row_table: StateTable<S>,
44 pub chunk_size: usize,
45 pub time_column_index: usize,
46 pub fill_columns: HashMap<usize, FillStrategy>,
47 pub gap_interval: NonStrictExpression,
48 pub high_gap_fill_amplification_threshold: usize,
49}
50
51pub struct GapFillMetrics {
52 pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
53}
54
55struct GapFillGenerationContext<'a> {
56 metrics: &'a GapFillMetrics,
57 high_amplification_threshold: usize,
58 actor_ctx: &'a ActorContextRef,
59}
60
61struct ExecutorInner<S: StateStore> {
62 actor_ctx: ActorContextRef,
63
64 schema: Schema,
65 buffer_table: StateTable<S>,
66 prev_row_table: StateTable<S>,
67 chunk_size: usize,
68 time_column_index: usize,
69 fill_columns: HashMap<usize, FillStrategy>,
70 gap_interval: NonStrictExpression,
71 high_gap_fill_amplification_threshold: usize,
72
73 metrics: GapFillMetrics,
75}
76
77struct ExecutionVars<S: StateStore> {
78 buffer: SortBuffer<S>,
79}
80
81impl<S: StateStore> ExecutorInner<S> {
82 fn generate_filled_rows(
83 prev_row: &OwnedRow,
84 curr_row: &OwnedRow,
85 time_column_index: usize,
86 fill_columns: &HashMap<usize, FillStrategy>,
87 interval: risingwave_common::types::Interval,
88 generation_context: &GapFillGenerationContext<'_>,
89 ) -> Result<Vec<OwnedRow>, ExprError> {
90 let mut filled_rows = Vec::new();
91 let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
92 prev_row.datum_at(time_column_index),
93 curr_row.datum_at(time_column_index),
94 ) else {
95 return Ok(filled_rows);
96 };
97
98 let prev_time = match prev_time_scalar {
99 ScalarRefImpl::Timestamp(ts) => ts,
100 ScalarRefImpl::Timestamptz(ts) => {
101 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
102 Ok(timestamp) => timestamp,
103 Err(_) => {
104 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
105 return Ok(filled_rows);
106 }
107 }
108 }
109 _ => {
110 warn!(
111 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
112 prev_time_scalar
113 );
114 return Ok(filled_rows);
115 }
116 };
117
118 let curr_time = match curr_time_scalar {
119 ScalarRefImpl::Timestamp(ts) => ts,
120 ScalarRefImpl::Timestamptz(ts) => {
121 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
122 Ok(timestamp) => timestamp,
123 Err(_) => {
124 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
125 return Ok(filled_rows);
126 }
127 }
128 }
129 _ => {
130 warn!(
131 "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
132 curr_time_scalar
133 );
134 return Ok(filled_rows);
135 }
136 };
137 if prev_time >= curr_time {
138 return Ok(filled_rows);
139 }
140
141 let mut fill_time = match prev_time.checked_add(interval) {
142 Some(t) => t,
143 None => {
144 return Ok(filled_rows);
145 }
146 };
147 if fill_time >= curr_time {
148 return Ok(filled_rows);
149 }
150
151 let mut row_count = 0;
153 let mut temp_time = fill_time;
154 while temp_time < curr_time {
155 row_count += 1;
156 temp_time = match temp_time.checked_add(interval) {
157 Some(t) => t,
158 None => break,
159 };
160 }
161
162 let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
164 let mut interpolation_states: Vec<Datum> = Vec::new();
165
166 for i in 0..prev_row.len() {
167 if let Some(strategy) = fill_columns.get(&i) {
168 if matches!(strategy, FillStrategy::Interpolate) {
169 let step = calculate_interpolation_step(
170 prev_row.datum_at(i),
171 curr_row.datum_at(i),
172 row_count + 1,
173 );
174 interpolation_steps.push(step.clone());
175 interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
176 } else {
177 interpolation_steps.push(None);
178 interpolation_states.push(None);
179 }
180 } else {
181 interpolation_steps.push(None);
182 interpolation_states.push(None);
183 }
184 }
185
186 while fill_time < curr_time {
188 let mut new_row_data = Vec::with_capacity(prev_row.len());
189
190 for col_idx in 0..prev_row.len() {
191 let datum = if col_idx == time_column_index {
192 let fill_time_scalar = match prev_time_scalar {
194 ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
195 ScalarRefImpl::Timestamptz(_) => {
196 let micros = fill_time.0.and_utc().timestamp_micros();
197 ScalarImpl::Timestamptz(
198 risingwave_common::types::Timestamptz::from_micros(micros),
199 )
200 }
201 _ => unreachable!("Time column should be Timestamp or Timestamptz"),
202 };
203 Some(fill_time_scalar)
204 } else if let Some(strategy) = fill_columns.get(&col_idx) {
205 match strategy {
207 FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
208 FillStrategy::Null => None,
209 FillStrategy::Interpolate => {
210 if let Some(step) = &interpolation_steps[col_idx] {
212 apply_interpolation_step(&mut interpolation_states[col_idx], step);
213 interpolation_states[col_idx].clone()
214 } else {
215 None
217 }
218 }
219 }
220 } else {
221 None
223 };
224 new_row_data.push(datum);
225 }
226
227 filled_rows.push(OwnedRow::new(new_row_data));
228
229 fill_time = match fill_time.checked_add(interval) {
230 Some(t) => t,
231 None => {
232 warn!(
234 "Gap fill stopped due to timestamp overflow after generating {} rows.",
235 filled_rows.len()
236 );
237 break;
238 }
239 };
240 }
241
242 generation_context
244 .metrics
245 .gap_fill_generated_rows_count
246 .inc_by(filled_rows.len() as u64);
247
248 if filled_rows.len() > generation_context.high_amplification_threshold {
249 tracing::warn!(target: "high_gap_fill_amplification",
250 generated_rows_len = filled_rows.len(),
251 prev_time = ?prev_time,
252 curr_time = ?curr_time,
253 gap_interval = ?interval,
254 actor_id = %generation_context.actor_ctx.id,
255 fragment_id = %generation_context.actor_ctx.fragment_id,
256 "large rows generated by gap fill"
257 );
258 }
259
260 Ok(filled_rows)
261 }
262}
263
264impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
265 fn execute(self: Box<Self>) -> BoxedMessageStream {
266 self.execute_inner().boxed()
267 }
268}
269
270impl<S: StateStore> EowcGapFillExecutor<S> {
271 pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
272 let metrics = args.actor_ctx.streaming_metrics.clone();
273 let actor_id = args.actor_ctx.id.to_string();
274 let fragment_id = args.actor_ctx.fragment_id.to_string();
275 let gap_fill_metrics = GapFillMetrics {
276 gap_fill_generated_rows_count: metrics
277 .gap_fill_generated_rows_count
278 .with_guarded_label_values(&[&actor_id, &fragment_id]),
279 };
280
281 Self {
282 input: args.input,
283
284 inner: ExecutorInner {
285 actor_ctx: args.actor_ctx,
286 schema: args.schema,
287 buffer_table: args.buffer_table,
288 prev_row_table: args.prev_row_table,
289 chunk_size: args.chunk_size,
290 time_column_index: args.time_column_index,
291 fill_columns: args.fill_columns,
292 gap_interval: args.gap_interval,
293 high_gap_fill_amplification_threshold: args.high_gap_fill_amplification_threshold,
294 metrics: gap_fill_metrics,
295 },
296 }
297 }
298
299 #[try_stream(ok = Message, error = StreamExecutorError)]
300 async fn execute_inner(self) {
301 let Self {
302 input,
303 inner: mut this,
304 } = self;
305
306 let mut input = input.execute();
307
308 let barrier = expect_first_barrier(&mut input).await?;
309 let first_epoch = barrier.epoch;
310 yield Message::Barrier(barrier);
311 this.buffer_table.init_epoch(first_epoch).await?;
312 this.prev_row_table.init_epoch(first_epoch).await?;
313
314 let dummy_row = OwnedRow::new(vec![]);
316 let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
317 let interval = interval_datum
318 .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
319 .into_interval();
320
321 if interval <= Interval::from_month_day_usec(0, 0, 0) {
323 Err(anyhow::anyhow!("Gap interval must be positive"))?;
324 }
325
326 let mut vars = ExecutionVars {
327 buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
328 };
329 let mut committed_prev_row: Option<OwnedRow> =
330 this.prev_row_table.get_from_one_row_table().await?;
331 let mut staging_prev_row = committed_prev_row.clone();
332
333 vars.buffer.refill_cache(None, &this.buffer_table).await?;
334
335 #[for_await]
336 for msg in input {
337 match msg? {
338 Message::Watermark(watermark @ Watermark { col_idx, .. })
339 if col_idx == this.time_column_index =>
340 {
341 let mut chunk_builder =
342 StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
343
344 #[for_await]
345 for row in vars
346 .buffer
347 .consume(watermark.val.clone(), &mut this.buffer_table)
348 {
349 let current_row = row?;
350 if let Some(p_row) = &staging_prev_row {
351 let generation_context = GapFillGenerationContext {
352 metrics: &this.metrics,
353 high_amplification_threshold: this
354 .high_gap_fill_amplification_threshold,
355 actor_ctx: &this.actor_ctx,
356 };
357 let filled_rows = ExecutorInner::<S>::generate_filled_rows(
358 p_row,
359 ¤t_row,
360 this.time_column_index,
361 &this.fill_columns,
362 interval,
363 &generation_context,
364 )?;
365 for filled_row in filled_rows {
366 if let Some(chunk) =
367 chunk_builder.append_row(Op::Insert, &filled_row)
368 {
369 yield Message::Chunk(chunk);
370 }
371 }
372 }
373 if let Some(chunk) = chunk_builder.append_row(Op::Insert, ¤t_row) {
374 yield Message::Chunk(chunk);
375 }
376 staging_prev_row = Some(current_row);
377 }
378 if let Some(chunk) = chunk_builder.take() {
379 yield Message::Chunk(chunk);
380 }
381
382 yield Message::Watermark(watermark);
383 }
384 Message::Watermark(_) => continue,
385 Message::Chunk(chunk) => {
386 vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
387 this.buffer_table.try_flush().await?;
388 }
389 Message::Barrier(barrier) => {
390 if committed_prev_row != staging_prev_row {
391 if let Some(old_row) = &committed_prev_row {
392 this.prev_row_table.delete(old_row);
393 }
394 if let Some(new_row) = &staging_prev_row {
395 this.prev_row_table.insert(new_row);
396 }
397 }
398
399 let post_commit = this.buffer_table.commit(barrier.epoch).await?;
400 this.prev_row_table
401 .commit_assert_no_update_vnode_bitmap(barrier.epoch)
402 .await?;
403
404 committed_prev_row.clone_from(&staging_prev_row);
405
406 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
407 yield Message::Barrier(barrier);
408
409 if post_commit
410 .post_yield_barrier(update_vnode_bitmap)
411 .await?
412 .is_some()
413 {
414 vars.buffer.refill_cache(None, &this.buffer_table).await?;
419 }
420 }
421 }
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
429 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
430 use risingwave_common::types::Interval;
431 use risingwave_common::types::test_utils::IntervalTestExt;
432 use risingwave_common::util::epoch::test_epoch;
433 use risingwave_common::util::sort_util::OrderType;
434 use risingwave_expr::expr::LiteralExpression;
435 use risingwave_storage::memory::MemoryStateStore;
436
437 use super::*;
438 use crate::common::table::test_utils::gen_pbtable_with_dist_key;
439 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
440
441 async fn create_executor<S: StateStore>(
442 time_column_index: usize,
443 fill_columns: HashMap<usize, FillStrategy>,
444 gap_interval: NonStrictExpression,
445 store: S,
446 ) -> (MessageSender, BoxedMessageStream) {
447 let input_schema = Schema::new(vec![
448 Field::unnamed(DataType::Timestamp),
449 Field::unnamed(DataType::Int32),
450 Field::unnamed(DataType::Int64),
451 Field::unnamed(DataType::Float32),
452 Field::unnamed(DataType::Float64),
453 ]);
454 let input_stream_key = vec![time_column_index];
455
456 let table_columns = vec![
457 ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
458 ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
459 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
460 ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
461 ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
462 ];
463
464 let table_pk_indices = vec![time_column_index];
465 let table_order_types = vec![OrderType::ascending()];
466 let buffer_table = StateTable::from_table_catalog(
467 &gen_pbtable_with_dist_key(
468 TableId::new(0),
469 table_columns.clone(),
470 table_order_types,
471 table_pk_indices,
472 0,
473 vec![],
474 ),
475 store.clone(),
476 None,
477 )
478 .await;
479
480 let prev_row_pk_indices = vec![0];
481 let prev_row_order_types = vec![OrderType::ascending()];
482 let prev_row_table = StateTable::from_table_catalog(
483 &gen_pbtable_with_dist_key(
484 TableId::new(1),
485 table_columns,
486 prev_row_order_types,
487 prev_row_pk_indices,
488 0,
489 vec![],
490 ),
491 store,
492 None,
493 )
494 .await;
495
496 let (tx, source) = MockSource::channel();
497 let source = source.into_executor(input_schema, input_stream_key);
498 let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
499 actor_ctx: ActorContext::for_test(123),
500 schema: source.schema().clone(),
501 input: source,
502 buffer_table,
503 prev_row_table,
504 chunk_size: 1024,
505 time_column_index,
506 fill_columns,
507 gap_interval,
508 high_gap_fill_amplification_threshold: 2048,
509 });
510
511 (tx, gap_fill_executor.boxed().execute())
512 }
513
514 #[tokio::test]
515 async fn test_gap_fill_interpolate() {
516 let time_column_index = 0;
517 let gap_interval = Interval::from_days(1);
518 let fill_columns = HashMap::from([
519 (1, FillStrategy::Interpolate),
520 (2, FillStrategy::Interpolate),
521 (3, FillStrategy::Interpolate),
522 (4, FillStrategy::Interpolate),
523 ]);
524 let store = MemoryStateStore::new();
525 let (mut tx, mut gap_fill_executor) = create_executor(
526 time_column_index,
527 fill_columns,
528 NonStrictExpression::for_test(LiteralExpression::new(
529 DataType::Interval,
530 Some(gap_interval.into()),
531 )),
532 store.clone(),
533 )
534 .await;
535
536 tx.push_barrier(test_epoch(1), false);
537 gap_fill_executor.expect_barrier().await;
538
539 tx.push_int64_watermark(1, 0_i64);
540 tx.push_watermark(
541 0,
542 DataType::Timestamp,
543 "2023-03-06 18:27:03"
544 .parse::<risingwave_common::types::Timestamp>()
545 .unwrap()
546 .into(),
547 );
548 gap_fill_executor.expect_watermark().await;
549
550 tx.push_chunk(StreamChunk::from_pretty(
551 " TS i I f F
552 + 2023-04-01T10:00:00 10 100 1.0 100.0
553 + 2023-04-05T10:00:00 50 200 5.0 200.0",
554 ));
555
556 tx.push_int64_watermark(1, 0_i64);
557 tx.push_watermark(
558 0,
559 DataType::Timestamp,
560 "2023-04-05 18:27:03"
561 .parse::<risingwave_common::types::Timestamp>()
562 .unwrap()
563 .into(),
564 );
565
566 let chunk = gap_fill_executor.expect_chunk().await;
567 assert_eq!(
568 chunk,
569 StreamChunk::from_pretty(
570 " TS i I f F
571 + 2023-04-01T10:00:00 10 100 1.0 100.0
572 + 2023-04-02T10:00:00 20 125 2.0 125.0
573 + 2023-04-03T10:00:00 30 150 3.0 150.0
574 + 2023-04-04T10:00:00 40 175 4.0 175.0
575 + 2023-04-05T10:00:00 50 200 5.0 200.0",
576 )
577 );
578 gap_fill_executor.expect_watermark().await;
579 }
580
581 #[tokio::test]
582 async fn test_gap_fill_locf() {
583 let time_column_index = 0;
584 let gap_interval = Interval::from_days(1);
585 let fill_columns = HashMap::from([
586 (1, FillStrategy::Locf),
587 (2, FillStrategy::Locf),
588 (3, FillStrategy::Locf),
589 (4, FillStrategy::Locf),
590 ]);
591 let store = MemoryStateStore::new();
592 let (mut tx, mut gap_fill_executor) = create_executor(
593 time_column_index,
594 fill_columns,
595 NonStrictExpression::for_test(LiteralExpression::new(
596 DataType::Interval,
597 Some(gap_interval.into()),
598 )),
599 store.clone(),
600 )
601 .await;
602
603 tx.push_barrier(test_epoch(1), false);
604 gap_fill_executor.expect_barrier().await;
605
606 tx.push_int64_watermark(1, 0_i64);
607 tx.push_watermark(
608 0,
609 DataType::Timestamp,
610 "2023-03-06 18:27:03"
611 .parse::<risingwave_common::types::Timestamp>()
612 .unwrap()
613 .into(),
614 );
615 gap_fill_executor.expect_watermark().await;
616
617 tx.push_chunk(StreamChunk::from_pretty(
618 " TS i I f F
619 + 2023-04-01T10:00:00 10 100 1.0 100.0
620 + 2023-04-05T10:00:00 50 200 5.0 200.0",
621 ));
622
623 tx.push_int64_watermark(1, 0_i64);
624 tx.push_watermark(
625 0,
626 DataType::Timestamp,
627 "2023-04-05 18:27:03"
628 .parse::<risingwave_common::types::Timestamp>()
629 .unwrap()
630 .into(),
631 );
632
633 let chunk = gap_fill_executor.expect_chunk().await;
634 assert_eq!(
635 chunk,
636 StreamChunk::from_pretty(
637 " TS i I f F
638 + 2023-04-01T10:00:00 10 100 1.0 100.0
639 + 2023-04-02T10:00:00 10 100 1.0 100.0
640 + 2023-04-03T10:00:00 10 100 1.0 100.0
641 + 2023-04-04T10:00:00 10 100 1.0 100.0
642 + 2023-04-05T10:00:00 50 200 5.0 200.0",
643 )
644 );
645 gap_fill_executor.expect_watermark().await;
646 }
647
648 #[tokio::test]
649 async fn test_gap_fill_null() {
650 let time_column_index = 0;
651 let gap_interval = Interval::from_days(1);
652 let fill_columns = HashMap::from([
653 (1, FillStrategy::Null),
654 (2, FillStrategy::Null),
655 (3, FillStrategy::Null),
656 (4, FillStrategy::Null),
657 ]);
658 let store = MemoryStateStore::new();
659 let (mut tx, mut gap_fill_executor) = create_executor(
660 time_column_index,
661 fill_columns,
662 NonStrictExpression::for_test(LiteralExpression::new(
663 DataType::Interval,
664 Some(gap_interval.into()),
665 )),
666 store.clone(),
667 )
668 .await;
669
670 tx.push_barrier(test_epoch(1), false);
671 gap_fill_executor.expect_barrier().await;
672
673 tx.push_int64_watermark(1, 0_i64);
674 tx.push_watermark(
675 0,
676 DataType::Timestamp,
677 "2023-03-06 18:27:03"
678 .parse::<risingwave_common::types::Timestamp>()
679 .unwrap()
680 .into(),
681 );
682 gap_fill_executor.expect_watermark().await;
683
684 tx.push_chunk(StreamChunk::from_pretty(
685 " TS i I f F
686 + 2023-04-01T10:00:00 10 100 1.0 100.0
687 + 2023-04-05T10:00:00 50 200 5.0 200.0",
688 ));
689
690 tx.push_int64_watermark(1, 0_i64);
691 tx.push_watermark(
692 0,
693 DataType::Timestamp,
694 "2023-04-05 18:27:03"
695 .parse::<risingwave_common::types::Timestamp>()
696 .unwrap()
697 .into(),
698 );
699
700 let chunk = gap_fill_executor.expect_chunk().await;
701 assert_eq!(
702 chunk,
703 StreamChunk::from_pretty(
704 " TS i I f F
705 + 2023-04-01T10:00:00 10 100 1.0 100.0
706 + 2023-04-02T10:00:00 . . . .
707 + 2023-04-03T10:00:00 . . . .
708 + 2023-04-04T10:00:00 . . . .
709 + 2023-04-05T10:00:00 50 200 5.0 200.0",
710 )
711 );
712 gap_fill_executor.expect_watermark().await;
713 }
714
715 #[tokio::test]
716 async fn test_gap_fill_mixed_strategy() {
717 let time_column_index = 0;
718 let gap_interval = Interval::from_days(1);
719 let fill_columns = HashMap::from([
720 (1, FillStrategy::Interpolate),
721 (2, FillStrategy::Locf),
722 (3, FillStrategy::Null),
723 (4, FillStrategy::Interpolate),
724 ]);
725 let store = MemoryStateStore::new();
726 let (mut tx, mut gap_fill_executor) = create_executor(
727 time_column_index,
728 fill_columns,
729 NonStrictExpression::for_test(LiteralExpression::new(
730 DataType::Interval,
731 Some(gap_interval.into()),
732 )),
733 store.clone(),
734 )
735 .await;
736
737 tx.push_barrier(test_epoch(1), false);
738 gap_fill_executor.expect_barrier().await;
739
740 tx.push_int64_watermark(1, 0_i64);
741 tx.push_watermark(
742 0,
743 DataType::Timestamp,
744 "2023-03-06 18:27:03"
745 .parse::<risingwave_common::types::Timestamp>()
746 .unwrap()
747 .into(),
748 );
749 gap_fill_executor.expect_watermark().await;
750
751 tx.push_chunk(StreamChunk::from_pretty(
752 " TS i I f F
753 + 2023-04-01T10:00:00 10 100 1.0 100.0
754 + 2023-04-05T10:00:00 50 200 5.0 200.0",
755 ));
756
757 tx.push_int64_watermark(1, 0_i64);
758 tx.push_watermark(
759 0,
760 DataType::Timestamp,
761 "2023-04-05 18:27:03"
762 .parse::<risingwave_common::types::Timestamp>()
763 .unwrap()
764 .into(),
765 );
766
767 let chunk = gap_fill_executor.expect_chunk().await;
768 assert_eq!(
769 chunk,
770 StreamChunk::from_pretty(
771 " TS i I f F
772 + 2023-04-01T10:00:00 10 100 1.0 100.0
773 + 2023-04-02T10:00:00 20 100 . 125.0
774 + 2023-04-03T10:00:00 30 100 . 150.0
775 + 2023-04-04T10:00:00 40 100 . 175.0
776 + 2023-04-05T10:00:00 50 200 5.0 200.0",
777 )
778 );
779 gap_fill_executor.expect_watermark().await;
780 }
781
782 #[tokio::test]
783 async fn test_gap_fill_fail_over() {
784 let time_column_index = 0;
785 let gap_interval = Interval::from_days(1);
786 let fill_columns = HashMap::from([
787 (1, FillStrategy::Locf),
788 (2, FillStrategy::Interpolate),
789 (3, FillStrategy::Locf),
790 (4, FillStrategy::Locf),
791 ]);
792 let store = MemoryStateStore::new();
793 let (mut tx, mut gap_fill_executor) = create_executor(
794 time_column_index,
795 fill_columns.clone(),
796 NonStrictExpression::for_test(LiteralExpression::new(
797 DataType::Interval,
798 Some(gap_interval.into()),
799 )),
800 store.clone(),
801 )
802 .await;
803
804 tx.push_barrier(test_epoch(1), false);
805 gap_fill_executor.expect_barrier().await;
806
807 tx.push_chunk(StreamChunk::from_pretty(
808 " TS i I f F
809 + 2023-04-01T10:00:00 10 100 1.0 100.0
810 + 2023-04-05T10:00:00 50 200 5.0 200.0",
811 ));
812
813 tx.push_barrier(test_epoch(2), false);
814 gap_fill_executor.expect_barrier().await;
815
816 let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
817 time_column_index,
818 fill_columns.clone(),
819 NonStrictExpression::for_test(LiteralExpression::new(
820 DataType::Interval,
821 Some(gap_interval.into()),
822 )),
823 store.clone(),
824 )
825 .await;
826
827 recovered_tx.push_barrier(test_epoch(2), false);
828 recovered_gap_fill_executor.expect_barrier().await;
829
830 recovered_tx.push_watermark(
831 0,
832 DataType::Timestamp,
833 "2023-04-06T10:00:00"
834 .parse::<risingwave_common::types::Timestamp>()
835 .unwrap()
836 .into(),
837 );
838
839 let chunk = recovered_gap_fill_executor.expect_chunk().await;
840 assert_eq!(
841 chunk,
842 StreamChunk::from_pretty(
843 " TS i I f F
844 + 2023-04-01T10:00:00 10 100 1.0 100.0
845 + 2023-04-02T10:00:00 10 125 1.0 100.0
846 + 2023-04-03T10:00:00 10 150 1.0 100.0
847 + 2023-04-04T10:00:00 10 175 1.0 100.0
848 + 2023-04-05T10:00:00 50 200 5.0 200.0"
849 )
850 );
851
852 recovered_gap_fill_executor.expect_watermark().await;
853
854 recovered_tx.push_chunk(StreamChunk::from_pretty(
855 " TS i I f F
856 + 2023-04-08T10:00:00 80 500 8.0 500.0",
857 ));
858
859 recovered_tx.push_barrier(test_epoch(3), false);
860 recovered_gap_fill_executor.expect_barrier().await;
861
862 let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
863 time_column_index,
864 fill_columns,
865 NonStrictExpression::for_test(LiteralExpression::new(
866 DataType::Interval,
867 Some(gap_interval.into()),
868 )),
869 store,
870 )
871 .await;
872
873 final_recovered_tx.push_barrier(test_epoch(3), false);
874 final_recovered_gap_fill_executor.expect_barrier().await;
875
876 final_recovered_tx.push_watermark(
877 0,
878 DataType::Timestamp,
879 "2023-04-09T10:00:00"
880 .parse::<risingwave_common::types::Timestamp>()
881 .unwrap()
882 .into(),
883 );
884
885 let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
886 assert_eq!(
887 chunk,
888 StreamChunk::from_pretty(
889 " TS i I f F
890 + 2023-04-06T10:00:00 50 300 5.0 200.0
891 + 2023-04-07T10:00:00 50 400 5.0 200.0
892 + 2023-04-08T10:00:00 80 500 8.0 500.0"
893 )
894 );
895
896 final_recovered_gap_fill_executor.expect_watermark().await;
897 }
898}