risingwave_stream/executor/eowc/
eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19    FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32    input: Executor,
33    inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37    pub actor_ctx: ActorContextRef,
38
39    pub input: Executor,
40
41    pub schema: Schema,
42    pub buffer_table: StateTable<S>,
43    pub prev_row_table: StateTable<S>,
44    pub chunk_size: usize,
45    pub time_column_index: usize,
46    pub fill_columns: HashMap<usize, FillStrategy>,
47    pub gap_interval: NonStrictExpression,
48}
49
50pub struct GapFillMetrics {
51    pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
52}
53
54struct ExecutorInner<S: StateStore> {
55    actor_ctx: ActorContextRef,
56
57    schema: Schema,
58    buffer_table: StateTable<S>,
59    prev_row_table: StateTable<S>,
60    chunk_size: usize,
61    time_column_index: usize,
62    fill_columns: HashMap<usize, FillStrategy>,
63    gap_interval: NonStrictExpression,
64
65    // Metrics
66    metrics: GapFillMetrics,
67}
68
69struct ExecutionVars<S: StateStore> {
70    buffer: SortBuffer<S>,
71}
72
73impl<S: StateStore> ExecutorInner<S> {
74    fn generate_filled_rows(
75        prev_row: &OwnedRow,
76        curr_row: &OwnedRow,
77        time_column_index: usize,
78        fill_columns: &HashMap<usize, FillStrategy>,
79        interval: risingwave_common::types::Interval,
80        metrics: &GapFillMetrics,
81    ) -> Result<Vec<OwnedRow>, ExprError> {
82        let mut filled_rows = Vec::new();
83        let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
84            prev_row.datum_at(time_column_index),
85            curr_row.datum_at(time_column_index),
86        ) else {
87            return Ok(filled_rows);
88        };
89
90        let prev_time = match prev_time_scalar {
91            ScalarRefImpl::Timestamp(ts) => ts,
92            ScalarRefImpl::Timestamptz(ts) => {
93                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
94                    Ok(timestamp) => timestamp,
95                    Err(_) => {
96                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
97                        return Ok(filled_rows);
98                    }
99                }
100            }
101            _ => {
102                warn!(
103                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
104                    prev_time_scalar
105                );
106                return Ok(filled_rows);
107            }
108        };
109
110        let curr_time = match curr_time_scalar {
111            ScalarRefImpl::Timestamp(ts) => ts,
112            ScalarRefImpl::Timestamptz(ts) => {
113                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
114                    Ok(timestamp) => timestamp,
115                    Err(_) => {
116                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
117                        return Ok(filled_rows);
118                    }
119                }
120            }
121            _ => {
122                warn!(
123                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
124                    curr_time_scalar
125                );
126                return Ok(filled_rows);
127            }
128        };
129        if prev_time >= curr_time {
130            return Ok(filled_rows);
131        }
132
133        let mut fill_time = match prev_time.checked_add(interval) {
134            Some(t) => t,
135            None => {
136                return Ok(filled_rows);
137            }
138        };
139        if fill_time >= curr_time {
140            return Ok(filled_rows);
141        }
142
143        // Calculate the number of rows to fill
144        let mut row_count = 0;
145        let mut temp_time = fill_time;
146        while temp_time < curr_time {
147            row_count += 1;
148            temp_time = match temp_time.checked_add(interval) {
149                Some(t) => t,
150                None => break,
151            };
152        }
153
154        // Pre-compute interpolation steps for each column that requires interpolation
155        let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
156        let mut interpolation_states: Vec<Datum> = Vec::new();
157
158        for i in 0..prev_row.len() {
159            if let Some(strategy) = fill_columns.get(&i) {
160                if matches!(strategy, FillStrategy::Interpolate) {
161                    let step = calculate_interpolation_step(
162                        prev_row.datum_at(i),
163                        curr_row.datum_at(i),
164                        row_count + 1,
165                    );
166                    interpolation_steps.push(step.clone());
167                    interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
168                } else {
169                    interpolation_steps.push(None);
170                    interpolation_states.push(None);
171                }
172            } else {
173                interpolation_steps.push(None);
174                interpolation_states.push(None);
175            }
176        }
177
178        // Generate filled rows, applying the appropriate strategy for each column
179        while fill_time < curr_time {
180            let mut new_row_data = Vec::with_capacity(prev_row.len());
181
182            for col_idx in 0..prev_row.len() {
183                let datum = if col_idx == time_column_index {
184                    // Time column: use the incremented timestamp
185                    let fill_time_scalar = match prev_time_scalar {
186                        ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
187                        ScalarRefImpl::Timestamptz(_) => {
188                            let micros = fill_time.0.and_utc().timestamp_micros();
189                            ScalarImpl::Timestamptz(
190                                risingwave_common::types::Timestamptz::from_micros(micros),
191                            )
192                        }
193                        _ => unreachable!("Time column should be Timestamp or Timestamptz"),
194                    };
195                    Some(fill_time_scalar)
196                } else if let Some(strategy) = fill_columns.get(&col_idx) {
197                    // Apply the fill strategy for this column
198                    match strategy {
199                        FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
200                        FillStrategy::Null => None,
201                        FillStrategy::Interpolate => {
202                            // Apply interpolation step and update cumulative value
203                            if let Some(step) = &interpolation_steps[col_idx] {
204                                apply_interpolation_step(&mut interpolation_states[col_idx], step);
205                                interpolation_states[col_idx].clone()
206                            } else {
207                                // If interpolation step is None, fill with NULL
208                                None
209                            }
210                        }
211                    }
212                } else {
213                    // No strategy specified, default to NULL
214                    None
215                };
216                new_row_data.push(datum);
217            }
218
219            filled_rows.push(OwnedRow::new(new_row_data));
220
221            fill_time = match fill_time.checked_add(interval) {
222                Some(t) => t,
223                None => {
224                    // Time overflow during iteration, stop filling
225                    warn!(
226                        "Gap fill stopped due to timestamp overflow after generating {} rows.",
227                        filled_rows.len()
228                    );
229                    break;
230                }
231            };
232        }
233
234        // Update metrics with the number of generated rows
235        metrics
236            .gap_fill_generated_rows_count
237            .inc_by(filled_rows.len() as u64);
238
239        Ok(filled_rows)
240    }
241}
242
243impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
244    fn execute(self: Box<Self>) -> BoxedMessageStream {
245        self.execute_inner().boxed()
246    }
247}
248
249impl<S: StateStore> EowcGapFillExecutor<S> {
250    pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
251        let metrics = args.actor_ctx.streaming_metrics.clone();
252        let actor_id = args.actor_ctx.id.to_string();
253        let fragment_id = args.actor_ctx.fragment_id.to_string();
254        let gap_fill_metrics = GapFillMetrics {
255            gap_fill_generated_rows_count: metrics
256                .gap_fill_generated_rows_count
257                .with_guarded_label_values(&[&actor_id, &fragment_id]),
258        };
259
260        Self {
261            input: args.input,
262
263            inner: ExecutorInner {
264                actor_ctx: args.actor_ctx,
265                schema: args.schema,
266                buffer_table: args.buffer_table,
267                prev_row_table: args.prev_row_table,
268                chunk_size: args.chunk_size,
269                time_column_index: args.time_column_index,
270                fill_columns: args.fill_columns,
271                gap_interval: args.gap_interval,
272                metrics: gap_fill_metrics,
273            },
274        }
275    }
276
277    #[try_stream(ok = Message, error = StreamExecutorError)]
278    async fn execute_inner(self) {
279        let Self {
280            input,
281            inner: mut this,
282        } = self;
283
284        let mut input = input.execute();
285
286        let barrier = expect_first_barrier(&mut input).await?;
287        let first_epoch = barrier.epoch;
288        yield Message::Barrier(barrier);
289        this.buffer_table.init_epoch(first_epoch).await?;
290        this.prev_row_table.init_epoch(first_epoch).await?;
291
292        // Calculate and validate gap interval once at initialization
293        let dummy_row = OwnedRow::new(vec![]);
294        let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
295        let interval = interval_datum
296            .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
297            .into_interval();
298
299        // Validate that gap interval is not zero
300        if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
301            Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
302        }
303
304        let mut vars = ExecutionVars {
305            buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
306        };
307        let mut committed_prev_row: Option<OwnedRow> =
308            this.prev_row_table.get_from_one_row_table().await?;
309        let mut staging_prev_row = committed_prev_row.clone();
310
311        vars.buffer.refill_cache(None, &this.buffer_table).await?;
312
313        #[for_await]
314        for msg in input {
315            match msg? {
316                Message::Watermark(watermark @ Watermark { col_idx, .. })
317                    if col_idx == this.time_column_index =>
318                {
319                    let mut chunk_builder =
320                        StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
321
322                    #[for_await]
323                    for row in vars
324                        .buffer
325                        .consume(watermark.val.clone(), &mut this.buffer_table)
326                    {
327                        let current_row = row?;
328                        if let Some(p_row) = &staging_prev_row {
329                            let filled_rows = ExecutorInner::<S>::generate_filled_rows(
330                                p_row,
331                                &current_row,
332                                this.time_column_index,
333                                &this.fill_columns,
334                                interval,
335                                &this.metrics,
336                            )?;
337                            for filled_row in filled_rows {
338                                if let Some(chunk) =
339                                    chunk_builder.append_row(Op::Insert, &filled_row)
340                                {
341                                    yield Message::Chunk(chunk);
342                                }
343                            }
344                        }
345                        if let Some(chunk) = chunk_builder.append_row(Op::Insert, &current_row) {
346                            yield Message::Chunk(chunk);
347                        }
348                        staging_prev_row = Some(current_row);
349                    }
350                    if let Some(chunk) = chunk_builder.take() {
351                        yield Message::Chunk(chunk);
352                    }
353
354                    yield Message::Watermark(watermark);
355                }
356                Message::Watermark(_) => continue,
357                Message::Chunk(chunk) => {
358                    vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
359                    this.buffer_table.try_flush().await?;
360                }
361                Message::Barrier(barrier) => {
362                    if committed_prev_row != staging_prev_row {
363                        if let Some(old_row) = &committed_prev_row {
364                            this.prev_row_table.delete(old_row);
365                        }
366                        if let Some(new_row) = &staging_prev_row {
367                            this.prev_row_table.insert(new_row);
368                        }
369                    }
370
371                    let post_commit = this.buffer_table.commit(barrier.epoch).await?;
372                    this.prev_row_table
373                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
374                        .await?;
375
376                    committed_prev_row.clone_from(&staging_prev_row);
377
378                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
379                    yield Message::Barrier(barrier);
380
381                    if post_commit
382                        .post_yield_barrier(update_vnode_bitmap)
383                        .await?
384                        .is_some()
385                    {
386                        // `SortBuffer` may output data directly from its in-memory cache without
387                        // checking current vnode ownership. Therefore, we must rebuild the cache
388                        // whenever the vnode bitmap is updated to avoid emitting rows that no
389                        // longer belong to this actor.
390                        vars.buffer.refill_cache(None, &this.buffer_table).await?;
391                    }
392                }
393            }
394        }
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
401    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
402    use risingwave_common::types::Interval;
403    use risingwave_common::types::test_utils::IntervalTestExt;
404    use risingwave_common::util::epoch::test_epoch;
405    use risingwave_common::util::sort_util::OrderType;
406    use risingwave_expr::expr::LiteralExpression;
407    use risingwave_storage::memory::MemoryStateStore;
408
409    use super::*;
410    use crate::common::table::test_utils::gen_pbtable_with_dist_key;
411    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
412
413    async fn create_executor<S: StateStore>(
414        time_column_index: usize,
415        fill_columns: HashMap<usize, FillStrategy>,
416        gap_interval: NonStrictExpression,
417        store: S,
418    ) -> (MessageSender, BoxedMessageStream) {
419        let input_schema = Schema::new(vec![
420            Field::unnamed(DataType::Timestamp),
421            Field::unnamed(DataType::Int32),
422            Field::unnamed(DataType::Int64),
423            Field::unnamed(DataType::Float32),
424            Field::unnamed(DataType::Float64),
425        ]);
426        let input_stream_key = vec![time_column_index];
427
428        let table_columns = vec![
429            ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
430            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
431            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
432            ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
433            ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
434        ];
435
436        let table_pk_indices = vec![time_column_index];
437        let table_order_types = vec![OrderType::ascending()];
438        let buffer_table = StateTable::from_table_catalog(
439            &gen_pbtable_with_dist_key(
440                TableId::new(0),
441                table_columns.clone(),
442                table_order_types,
443                table_pk_indices,
444                0,
445                vec![],
446            ),
447            store.clone(),
448            None,
449        )
450        .await;
451
452        let prev_row_pk_indices = vec![0];
453        let prev_row_order_types = vec![OrderType::ascending()];
454        let prev_row_table = StateTable::from_table_catalog(
455            &gen_pbtable_with_dist_key(
456                TableId::new(1),
457                table_columns,
458                prev_row_order_types,
459                prev_row_pk_indices,
460                0,
461                vec![],
462            ),
463            store,
464            None,
465        )
466        .await;
467
468        let (tx, source) = MockSource::channel();
469        let source = source.into_executor(input_schema, input_stream_key);
470        let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
471            actor_ctx: ActorContext::for_test(123),
472            schema: source.schema().clone(),
473            input: source,
474            buffer_table,
475            prev_row_table,
476            chunk_size: 1024,
477            time_column_index,
478            fill_columns,
479            gap_interval,
480        });
481
482        (tx, gap_fill_executor.boxed().execute())
483    }
484
485    #[tokio::test]
486    async fn test_gap_fill_interpolate() {
487        let time_column_index = 0;
488        let gap_interval = Interval::from_days(1);
489        let fill_columns = HashMap::from([
490            (1, FillStrategy::Interpolate),
491            (2, FillStrategy::Interpolate),
492            (3, FillStrategy::Interpolate),
493            (4, FillStrategy::Interpolate),
494        ]);
495        let store = MemoryStateStore::new();
496        let (mut tx, mut gap_fill_executor) = create_executor(
497            time_column_index,
498            fill_columns,
499            NonStrictExpression::for_test(LiteralExpression::new(
500                DataType::Interval,
501                Some(gap_interval.into()),
502            )),
503            store.clone(),
504        )
505        .await;
506
507        tx.push_barrier(test_epoch(1), false);
508        gap_fill_executor.expect_barrier().await;
509
510        tx.push_int64_watermark(1, 0_i64);
511        tx.push_watermark(
512            0,
513            DataType::Timestamp,
514            "2023-03-06 18:27:03"
515                .parse::<risingwave_common::types::Timestamp>()
516                .unwrap()
517                .into(),
518        );
519        gap_fill_executor.expect_watermark().await;
520
521        tx.push_chunk(StreamChunk::from_pretty(
522            " TS                  i   I    f     F
523            + 2023-04-01T10:00:00 10 100 1.0 100.0
524            + 2023-04-05T10:00:00 50 200 5.0 200.0",
525        ));
526
527        tx.push_int64_watermark(1, 0_i64);
528        tx.push_watermark(
529            0,
530            DataType::Timestamp,
531            "2023-04-05 18:27:03"
532                .parse::<risingwave_common::types::Timestamp>()
533                .unwrap()
534                .into(),
535        );
536
537        let chunk = gap_fill_executor.expect_chunk().await;
538        assert_eq!(
539            chunk,
540            StreamChunk::from_pretty(
541                " TS                  i   I    f     F
542                + 2023-04-01T10:00:00 10 100 1.0 100.0
543                + 2023-04-02T10:00:00 20 125 2.0 125.0
544                + 2023-04-03T10:00:00 30 150 3.0 150.0
545                + 2023-04-04T10:00:00 40 175 4.0 175.0
546                + 2023-04-05T10:00:00 50 200 5.0 200.0",
547            )
548        );
549        gap_fill_executor.expect_watermark().await;
550    }
551
552    #[tokio::test]
553    async fn test_gap_fill_locf() {
554        let time_column_index = 0;
555        let gap_interval = Interval::from_days(1);
556        let fill_columns = HashMap::from([
557            (1, FillStrategy::Locf),
558            (2, FillStrategy::Locf),
559            (3, FillStrategy::Locf),
560            (4, FillStrategy::Locf),
561        ]);
562        let store = MemoryStateStore::new();
563        let (mut tx, mut gap_fill_executor) = create_executor(
564            time_column_index,
565            fill_columns,
566            NonStrictExpression::for_test(LiteralExpression::new(
567                DataType::Interval,
568                Some(gap_interval.into()),
569            )),
570            store.clone(),
571        )
572        .await;
573
574        tx.push_barrier(test_epoch(1), false);
575        gap_fill_executor.expect_barrier().await;
576
577        tx.push_int64_watermark(1, 0_i64);
578        tx.push_watermark(
579            0,
580            DataType::Timestamp,
581            "2023-03-06 18:27:03"
582                .parse::<risingwave_common::types::Timestamp>()
583                .unwrap()
584                .into(),
585        );
586        gap_fill_executor.expect_watermark().await;
587
588        tx.push_chunk(StreamChunk::from_pretty(
589            " TS                  i   I    f     F
590            + 2023-04-01T10:00:00 10 100 1.0 100.0
591            + 2023-04-05T10:00:00 50 200 5.0 200.0",
592        ));
593
594        tx.push_int64_watermark(1, 0_i64);
595        tx.push_watermark(
596            0,
597            DataType::Timestamp,
598            "2023-04-05 18:27:03"
599                .parse::<risingwave_common::types::Timestamp>()
600                .unwrap()
601                .into(),
602        );
603
604        let chunk = gap_fill_executor.expect_chunk().await;
605        assert_eq!(
606            chunk,
607            StreamChunk::from_pretty(
608                " TS                  i   I    f     F
609                + 2023-04-01T10:00:00 10 100 1.0 100.0
610                + 2023-04-02T10:00:00 10 100 1.0 100.0
611                + 2023-04-03T10:00:00 10 100 1.0 100.0
612                + 2023-04-04T10:00:00 10 100 1.0 100.0
613                + 2023-04-05T10:00:00 50 200 5.0 200.0",
614            )
615        );
616        gap_fill_executor.expect_watermark().await;
617    }
618
619    #[tokio::test]
620    async fn test_gap_fill_null() {
621        let time_column_index = 0;
622        let gap_interval = Interval::from_days(1);
623        let fill_columns = HashMap::from([
624            (1, FillStrategy::Null),
625            (2, FillStrategy::Null),
626            (3, FillStrategy::Null),
627            (4, FillStrategy::Null),
628        ]);
629        let store = MemoryStateStore::new();
630        let (mut tx, mut gap_fill_executor) = create_executor(
631            time_column_index,
632            fill_columns,
633            NonStrictExpression::for_test(LiteralExpression::new(
634                DataType::Interval,
635                Some(gap_interval.into()),
636            )),
637            store.clone(),
638        )
639        .await;
640
641        tx.push_barrier(test_epoch(1), false);
642        gap_fill_executor.expect_barrier().await;
643
644        tx.push_int64_watermark(1, 0_i64);
645        tx.push_watermark(
646            0,
647            DataType::Timestamp,
648            "2023-03-06 18:27:03"
649                .parse::<risingwave_common::types::Timestamp>()
650                .unwrap()
651                .into(),
652        );
653        gap_fill_executor.expect_watermark().await;
654
655        tx.push_chunk(StreamChunk::from_pretty(
656            " TS                  i   I    f     F
657            + 2023-04-01T10:00:00 10 100 1.0 100.0
658            + 2023-04-05T10:00:00 50 200 5.0 200.0",
659        ));
660
661        tx.push_int64_watermark(1, 0_i64);
662        tx.push_watermark(
663            0,
664            DataType::Timestamp,
665            "2023-04-05 18:27:03"
666                .parse::<risingwave_common::types::Timestamp>()
667                .unwrap()
668                .into(),
669        );
670
671        let chunk = gap_fill_executor.expect_chunk().await;
672        assert_eq!(
673            chunk,
674            StreamChunk::from_pretty(
675                " TS                  i   I    f     F
676                + 2023-04-01T10:00:00 10 100 1.0 100.0
677                + 2023-04-02T10:00:00 .  .    .    .
678                + 2023-04-03T10:00:00 .  .    .    .
679                + 2023-04-04T10:00:00 .  .    .    .
680                + 2023-04-05T10:00:00 50 200 5.0 200.0",
681            )
682        );
683        gap_fill_executor.expect_watermark().await;
684    }
685
686    #[tokio::test]
687    async fn test_gap_fill_mixed_strategy() {
688        let time_column_index = 0;
689        let gap_interval = Interval::from_days(1);
690        let fill_columns = HashMap::from([
691            (1, FillStrategy::Interpolate),
692            (2, FillStrategy::Locf),
693            (3, FillStrategy::Null),
694            (4, FillStrategy::Interpolate),
695        ]);
696        let store = MemoryStateStore::new();
697        let (mut tx, mut gap_fill_executor) = create_executor(
698            time_column_index,
699            fill_columns,
700            NonStrictExpression::for_test(LiteralExpression::new(
701                DataType::Interval,
702                Some(gap_interval.into()),
703            )),
704            store.clone(),
705        )
706        .await;
707
708        tx.push_barrier(test_epoch(1), false);
709        gap_fill_executor.expect_barrier().await;
710
711        tx.push_int64_watermark(1, 0_i64);
712        tx.push_watermark(
713            0,
714            DataType::Timestamp,
715            "2023-03-06 18:27:03"
716                .parse::<risingwave_common::types::Timestamp>()
717                .unwrap()
718                .into(),
719        );
720        gap_fill_executor.expect_watermark().await;
721
722        tx.push_chunk(StreamChunk::from_pretty(
723            " TS                  i   I    f     F
724            + 2023-04-01T10:00:00 10 100 1.0 100.0
725            + 2023-04-05T10:00:00 50 200 5.0 200.0",
726        ));
727
728        tx.push_int64_watermark(1, 0_i64);
729        tx.push_watermark(
730            0,
731            DataType::Timestamp,
732            "2023-04-05 18:27:03"
733                .parse::<risingwave_common::types::Timestamp>()
734                .unwrap()
735                .into(),
736        );
737
738        let chunk = gap_fill_executor.expect_chunk().await;
739        assert_eq!(
740            chunk,
741            StreamChunk::from_pretty(
742                " TS                  i   I    f     F
743                + 2023-04-01T10:00:00 10 100 1.0 100.0
744                + 2023-04-02T10:00:00 20 100 .    125.0
745                + 2023-04-03T10:00:00 30 100 .    150.0
746                + 2023-04-04T10:00:00 40 100 .    175.0
747                + 2023-04-05T10:00:00 50 200 5.0 200.0",
748            )
749        );
750        gap_fill_executor.expect_watermark().await;
751    }
752
753    #[tokio::test]
754    async fn test_gap_fill_fail_over() {
755        let time_column_index = 0;
756        let gap_interval = Interval::from_days(1);
757        let fill_columns = HashMap::from([
758            (1, FillStrategy::Locf),
759            (2, FillStrategy::Interpolate),
760            (3, FillStrategy::Locf),
761            (4, FillStrategy::Locf),
762        ]);
763        let store = MemoryStateStore::new();
764        let (mut tx, mut gap_fill_executor) = create_executor(
765            time_column_index,
766            fill_columns.clone(),
767            NonStrictExpression::for_test(LiteralExpression::new(
768                DataType::Interval,
769                Some(gap_interval.into()),
770            )),
771            store.clone(),
772        )
773        .await;
774
775        tx.push_barrier(test_epoch(1), false);
776        gap_fill_executor.expect_barrier().await;
777
778        tx.push_chunk(StreamChunk::from_pretty(
779            " TS                  i   I    f     F
780            + 2023-04-01T10:00:00 10 100 1.0 100.0
781            + 2023-04-05T10:00:00 50 200 5.0 200.0",
782        ));
783
784        tx.push_barrier(test_epoch(2), false);
785        gap_fill_executor.expect_barrier().await;
786
787        let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
788            time_column_index,
789            fill_columns.clone(),
790            NonStrictExpression::for_test(LiteralExpression::new(
791                DataType::Interval,
792                Some(gap_interval.into()),
793            )),
794            store.clone(),
795        )
796        .await;
797
798        recovered_tx.push_barrier(test_epoch(2), false);
799        recovered_gap_fill_executor.expect_barrier().await;
800
801        recovered_tx.push_watermark(
802            0,
803            DataType::Timestamp,
804            "2023-04-06T10:00:00"
805                .parse::<risingwave_common::types::Timestamp>()
806                .unwrap()
807                .into(),
808        );
809
810        let chunk = recovered_gap_fill_executor.expect_chunk().await;
811        assert_eq!(
812            chunk,
813            StreamChunk::from_pretty(
814                " TS                  i   I    f     F
815                + 2023-04-01T10:00:00 10 100 1.0 100.0
816                + 2023-04-02T10:00:00 10 125 1.0 100.0
817                + 2023-04-03T10:00:00 10 150 1.0 100.0
818                + 2023-04-04T10:00:00 10 175 1.0 100.0
819                + 2023-04-05T10:00:00 50 200 5.0 200.0"
820            )
821        );
822
823        recovered_gap_fill_executor.expect_watermark().await;
824
825        recovered_tx.push_chunk(StreamChunk::from_pretty(
826            " TS                  i   I    f     F
827            + 2023-04-08T10:00:00 80 500 8.0 500.0",
828        ));
829
830        recovered_tx.push_barrier(test_epoch(3), false);
831        recovered_gap_fill_executor.expect_barrier().await;
832
833        let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
834            time_column_index,
835            fill_columns,
836            NonStrictExpression::for_test(LiteralExpression::new(
837                DataType::Interval,
838                Some(gap_interval.into()),
839            )),
840            store,
841        )
842        .await;
843
844        final_recovered_tx.push_barrier(test_epoch(3), false);
845        final_recovered_gap_fill_executor.expect_barrier().await;
846
847        final_recovered_tx.push_watermark(
848            0,
849            DataType::Timestamp,
850            "2023-04-09T10:00:00"
851                .parse::<risingwave_common::types::Timestamp>()
852                .unwrap()
853                .into(),
854        );
855
856        let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
857        assert_eq!(
858            chunk,
859            StreamChunk::from_pretty(
860                " TS                  i   I    f     F
861                + 2023-04-06T10:00:00 50 300 5.0 200.0
862                + 2023-04-07T10:00:00 50 400 5.0 200.0
863                + 2023-04-08T10:00:00 80 500 8.0 500.0"
864            )
865        );
866
867        final_recovered_gap_fill_executor.expect_watermark().await;
868    }
869}