risingwave_stream/executor/eowc/
eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19    FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32    input: Executor,
33    inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37    pub actor_ctx: ActorContextRef,
38
39    pub input: Executor,
40
41    pub schema: Schema,
42    pub buffer_table: StateTable<S>,
43    pub prev_row_table: StateTable<S>,
44    pub chunk_size: usize,
45    pub time_column_index: usize,
46    pub fill_columns: HashMap<usize, FillStrategy>,
47    pub gap_interval: NonStrictExpression,
48}
49
50pub struct GapFillMetrics {
51    pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
52}
53
54struct ExecutorInner<S: StateStore> {
55    actor_ctx: ActorContextRef,
56
57    schema: Schema,
58    buffer_table: StateTable<S>,
59    prev_row_table: StateTable<S>,
60    chunk_size: usize,
61    time_column_index: usize,
62    fill_columns: HashMap<usize, FillStrategy>,
63    gap_interval: NonStrictExpression,
64
65    // Metrics
66    metrics: GapFillMetrics,
67}
68
69struct ExecutionVars<S: StateStore> {
70    buffer: SortBuffer<S>,
71}
72
73impl<S: StateStore> ExecutorInner<S> {
74    fn generate_filled_rows(
75        prev_row: &OwnedRow,
76        curr_row: &OwnedRow,
77        time_column_index: usize,
78        fill_columns: &HashMap<usize, FillStrategy>,
79        interval: risingwave_common::types::Interval,
80        metrics: &GapFillMetrics,
81    ) -> Result<Vec<OwnedRow>, ExprError> {
82        let mut filled_rows = Vec::new();
83        let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
84            prev_row.datum_at(time_column_index),
85            curr_row.datum_at(time_column_index),
86        ) else {
87            return Ok(filled_rows);
88        };
89
90        let prev_time = match prev_time_scalar {
91            ScalarRefImpl::Timestamp(ts) => ts,
92            ScalarRefImpl::Timestamptz(ts) => {
93                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
94                    Ok(timestamp) => timestamp,
95                    Err(_) => {
96                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
97                        return Ok(filled_rows);
98                    }
99                }
100            }
101            _ => {
102                warn!(
103                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
104                    prev_time_scalar
105                );
106                return Ok(filled_rows);
107            }
108        };
109
110        let curr_time = match curr_time_scalar {
111            ScalarRefImpl::Timestamp(ts) => ts,
112            ScalarRefImpl::Timestamptz(ts) => {
113                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
114                    Ok(timestamp) => timestamp,
115                    Err(_) => {
116                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
117                        return Ok(filled_rows);
118                    }
119                }
120            }
121            _ => {
122                warn!(
123                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
124                    curr_time_scalar
125                );
126                return Ok(filled_rows);
127            }
128        };
129        if prev_time >= curr_time {
130            return Ok(filled_rows);
131        }
132
133        let mut fill_time = match prev_time.checked_add(interval) {
134            Some(t) => t,
135            None => {
136                return Ok(filled_rows);
137            }
138        };
139        if fill_time >= curr_time {
140            return Ok(filled_rows);
141        }
142
143        // Calculate the number of rows to fill
144        let mut row_count = 0;
145        let mut temp_time = fill_time;
146        while temp_time < curr_time {
147            row_count += 1;
148            temp_time = match temp_time.checked_add(interval) {
149                Some(t) => t,
150                None => break,
151            };
152        }
153
154        // Pre-compute interpolation steps for each column that requires interpolation
155        let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
156        let mut interpolation_states: Vec<Datum> = Vec::new();
157
158        for i in 0..prev_row.len() {
159            if let Some(strategy) = fill_columns.get(&i) {
160                if matches!(strategy, FillStrategy::Interpolate) {
161                    let step = calculate_interpolation_step(
162                        prev_row.datum_at(i),
163                        curr_row.datum_at(i),
164                        row_count + 1,
165                    );
166                    interpolation_steps.push(step.clone());
167                    interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
168                } else {
169                    interpolation_steps.push(None);
170                    interpolation_states.push(None);
171                }
172            } else {
173                interpolation_steps.push(None);
174                interpolation_states.push(None);
175            }
176        }
177
178        // Generate filled rows, applying the appropriate strategy for each column
179        while fill_time < curr_time {
180            let mut new_row_data = Vec::with_capacity(prev_row.len());
181
182            for col_idx in 0..prev_row.len() {
183                let datum = if col_idx == time_column_index {
184                    // Time column: use the incremented timestamp
185                    let fill_time_scalar = match prev_time_scalar {
186                        ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
187                        ScalarRefImpl::Timestamptz(_) => {
188                            let micros = fill_time.0.and_utc().timestamp_micros();
189                            ScalarImpl::Timestamptz(
190                                risingwave_common::types::Timestamptz::from_micros(micros),
191                            )
192                        }
193                        _ => unreachable!("Time column should be Timestamp or Timestamptz"),
194                    };
195                    Some(fill_time_scalar)
196                } else if let Some(strategy) = fill_columns.get(&col_idx) {
197                    // Apply the fill strategy for this column
198                    match strategy {
199                        FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
200                        FillStrategy::Null => None,
201                        FillStrategy::Interpolate => {
202                            // Apply interpolation step and update cumulative value
203                            if let Some(step) = &interpolation_steps[col_idx] {
204                                apply_interpolation_step(&mut interpolation_states[col_idx], step);
205                                interpolation_states[col_idx].clone()
206                            } else {
207                                // If interpolation step is None, fill with NULL
208                                None
209                            }
210                        }
211                    }
212                } else {
213                    // No strategy specified, default to NULL
214                    None
215                };
216                new_row_data.push(datum);
217            }
218
219            filled_rows.push(OwnedRow::new(new_row_data));
220
221            fill_time = match fill_time.checked_add(interval) {
222                Some(t) => t,
223                None => {
224                    // Time overflow during iteration, stop filling
225                    warn!(
226                        "Gap fill stopped due to timestamp overflow after generating {} rows.",
227                        filled_rows.len()
228                    );
229                    break;
230                }
231            };
232        }
233
234        // Update metrics with the number of generated rows
235        metrics
236            .gap_fill_generated_rows_count
237            .inc_by(filled_rows.len() as u64);
238
239        Ok(filled_rows)
240    }
241}
242
243impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
244    fn execute(self: Box<Self>) -> BoxedMessageStream {
245        self.execute_inner().boxed()
246    }
247}
248
249impl<S: StateStore> EowcGapFillExecutor<S> {
250    pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
251        let metrics = args.actor_ctx.streaming_metrics.clone();
252        let actor_id = args.actor_ctx.id.to_string();
253        let fragment_id = args.actor_ctx.fragment_id.to_string();
254        let gap_fill_metrics = GapFillMetrics {
255            gap_fill_generated_rows_count: metrics
256                .gap_fill_generated_rows_count
257                .with_guarded_label_values(&[&actor_id, &fragment_id]),
258        };
259
260        Self {
261            input: args.input,
262
263            inner: ExecutorInner {
264                actor_ctx: args.actor_ctx,
265                schema: args.schema,
266                buffer_table: args.buffer_table,
267                prev_row_table: args.prev_row_table,
268                chunk_size: args.chunk_size,
269                time_column_index: args.time_column_index,
270                fill_columns: args.fill_columns,
271                gap_interval: args.gap_interval,
272                metrics: gap_fill_metrics,
273            },
274        }
275    }
276
277    #[try_stream(ok = Message, error = StreamExecutorError)]
278    async fn execute_inner(self) {
279        let Self {
280            input,
281            inner: mut this,
282        } = self;
283
284        let mut input = input.execute();
285
286        let barrier = expect_first_barrier(&mut input).await?;
287        let first_epoch = barrier.epoch;
288        yield Message::Barrier(barrier);
289        this.buffer_table.init_epoch(first_epoch).await?;
290        this.prev_row_table.init_epoch(first_epoch).await?;
291
292        // Calculate and validate gap interval once at initialization
293        let dummy_row = OwnedRow::new(vec![]);
294        let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
295        let interval = interval_datum
296            .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
297            .into_interval();
298
299        // Validate that gap interval is not zero
300        if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
301            Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
302        }
303
304        let mut vars = ExecutionVars {
305            buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
306        };
307        let mut committed_prev_row: Option<OwnedRow> =
308            this.prev_row_table.get_from_one_row_table().await?;
309        let mut staging_prev_row = committed_prev_row.clone();
310
311        vars.buffer.refill_cache(None, &this.buffer_table).await?;
312
313        #[for_await]
314        for msg in input {
315            match msg? {
316                Message::Watermark(watermark @ Watermark { col_idx, .. })
317                    if col_idx == this.time_column_index =>
318                {
319                    let mut chunk_builder =
320                        StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
321
322                    #[for_await]
323                    for row in vars
324                        .buffer
325                        .consume(watermark.val.clone(), &mut this.buffer_table)
326                    {
327                        let current_row = row?;
328                        if let Some(p_row) = &staging_prev_row {
329                            let filled_rows = ExecutorInner::<S>::generate_filled_rows(
330                                p_row,
331                                &current_row,
332                                this.time_column_index,
333                                &this.fill_columns,
334                                interval,
335                                &this.metrics,
336                            )?;
337                            for filled_row in filled_rows {
338                                if let Some(chunk) =
339                                    chunk_builder.append_row(Op::Insert, &filled_row)
340                                {
341                                    yield Message::Chunk(chunk);
342                                }
343                            }
344                        }
345                        if let Some(chunk) = chunk_builder.append_row(Op::Insert, &current_row) {
346                            yield Message::Chunk(chunk);
347                        }
348                        staging_prev_row = Some(current_row);
349                    }
350                    if let Some(chunk) = chunk_builder.take() {
351                        yield Message::Chunk(chunk);
352                    }
353
354                    yield Message::Watermark(watermark);
355                }
356                Message::Watermark(_) => continue,
357                Message::Chunk(chunk) => {
358                    vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
359                    this.buffer_table.try_flush().await?;
360                }
361                Message::Barrier(barrier) => {
362                    if committed_prev_row != staging_prev_row {
363                        if let Some(old_row) = &committed_prev_row {
364                            this.prev_row_table.delete(old_row);
365                        }
366                        if let Some(new_row) = &staging_prev_row {
367                            this.prev_row_table.insert(new_row);
368                        }
369                    }
370
371                    let post_commit = this.buffer_table.commit(barrier.epoch).await?;
372                    this.prev_row_table
373                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
374                        .await?;
375
376                    committed_prev_row.clone_from(&staging_prev_row);
377
378                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
379                    yield Message::Barrier(barrier);
380
381                    if let Some((_, cache_may_stale)) =
382                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
383                        && cache_may_stale
384                    {
385                        vars.buffer.refill_cache(None, &this.buffer_table).await?;
386                    }
387                }
388            }
389        }
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
396    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
397    use risingwave_common::types::Interval;
398    use risingwave_common::types::test_utils::IntervalTestExt;
399    use risingwave_common::util::epoch::test_epoch;
400    use risingwave_common::util::sort_util::OrderType;
401    use risingwave_expr::expr::LiteralExpression;
402    use risingwave_storage::memory::MemoryStateStore;
403
404    use super::*;
405    use crate::common::table::test_utils::gen_pbtable_with_dist_key;
406    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
407
408    async fn create_executor<S: StateStore>(
409        time_column_index: usize,
410        fill_columns: HashMap<usize, FillStrategy>,
411        gap_interval: NonStrictExpression,
412        store: S,
413    ) -> (MessageSender, BoxedMessageStream) {
414        let input_schema = Schema::new(vec![
415            Field::unnamed(DataType::Timestamp),
416            Field::unnamed(DataType::Int32),
417            Field::unnamed(DataType::Int64),
418            Field::unnamed(DataType::Float32),
419            Field::unnamed(DataType::Float64),
420        ]);
421        let input_stream_key = vec![time_column_index];
422
423        let table_columns = vec![
424            ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
425            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
426            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
427            ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
428            ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
429        ];
430
431        let table_pk_indices = vec![time_column_index];
432        let table_order_types = vec![OrderType::ascending()];
433        let buffer_table = StateTable::from_table_catalog(
434            &gen_pbtable_with_dist_key(
435                TableId::new(0),
436                table_columns.clone(),
437                table_order_types,
438                table_pk_indices,
439                0,
440                vec![],
441            ),
442            store.clone(),
443            None,
444        )
445        .await;
446
447        let prev_row_pk_indices = vec![0];
448        let prev_row_order_types = vec![OrderType::ascending()];
449        let prev_row_table = StateTable::from_table_catalog(
450            &gen_pbtable_with_dist_key(
451                TableId::new(1),
452                table_columns,
453                prev_row_order_types,
454                prev_row_pk_indices,
455                0,
456                vec![],
457            ),
458            store,
459            None,
460        )
461        .await;
462
463        let (tx, source) = MockSource::channel();
464        let source = source.into_executor(input_schema, input_stream_key);
465        let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
466            actor_ctx: ActorContext::for_test(123),
467            schema: source.schema().clone(),
468            input: source,
469            buffer_table,
470            prev_row_table,
471            chunk_size: 1024,
472            time_column_index,
473            fill_columns,
474            gap_interval,
475        });
476
477        (tx, gap_fill_executor.boxed().execute())
478    }
479
480    #[tokio::test]
481    async fn test_gap_fill_interpolate() {
482        let time_column_index = 0;
483        let gap_interval = Interval::from_days(1);
484        let fill_columns = HashMap::from([
485            (1, FillStrategy::Interpolate),
486            (2, FillStrategy::Interpolate),
487            (3, FillStrategy::Interpolate),
488            (4, FillStrategy::Interpolate),
489        ]);
490        let store = MemoryStateStore::new();
491        let (mut tx, mut gap_fill_executor) = create_executor(
492            time_column_index,
493            fill_columns,
494            NonStrictExpression::for_test(LiteralExpression::new(
495                DataType::Interval,
496                Some(gap_interval.into()),
497            )),
498            store.clone(),
499        )
500        .await;
501
502        tx.push_barrier(test_epoch(1), false);
503        gap_fill_executor.expect_barrier().await;
504
505        tx.push_int64_watermark(1, 0_i64);
506        tx.push_watermark(
507            0,
508            DataType::Timestamp,
509            "2023-03-06 18:27:03"
510                .parse::<risingwave_common::types::Timestamp>()
511                .unwrap()
512                .into(),
513        );
514        gap_fill_executor.expect_watermark().await;
515
516        tx.push_chunk(StreamChunk::from_pretty(
517            " TS                  i   I    f     F
518            + 2023-04-01T10:00:00 10 100 1.0 100.0
519            + 2023-04-05T10:00:00 50 200 5.0 200.0",
520        ));
521
522        tx.push_int64_watermark(1, 0_i64);
523        tx.push_watermark(
524            0,
525            DataType::Timestamp,
526            "2023-04-05 18:27:03"
527                .parse::<risingwave_common::types::Timestamp>()
528                .unwrap()
529                .into(),
530        );
531
532        let chunk = gap_fill_executor.expect_chunk().await;
533        assert_eq!(
534            chunk,
535            StreamChunk::from_pretty(
536                " TS                  i   I    f     F
537                + 2023-04-01T10:00:00 10 100 1.0 100.0
538                + 2023-04-02T10:00:00 20 125 2.0 125.0
539                + 2023-04-03T10:00:00 30 150 3.0 150.0
540                + 2023-04-04T10:00:00 40 175 4.0 175.0
541                + 2023-04-05T10:00:00 50 200 5.0 200.0",
542            )
543        );
544        gap_fill_executor.expect_watermark().await;
545    }
546
547    #[tokio::test]
548    async fn test_gap_fill_locf() {
549        let time_column_index = 0;
550        let gap_interval = Interval::from_days(1);
551        let fill_columns = HashMap::from([
552            (1, FillStrategy::Locf),
553            (2, FillStrategy::Locf),
554            (3, FillStrategy::Locf),
555            (4, FillStrategy::Locf),
556        ]);
557        let store = MemoryStateStore::new();
558        let (mut tx, mut gap_fill_executor) = create_executor(
559            time_column_index,
560            fill_columns,
561            NonStrictExpression::for_test(LiteralExpression::new(
562                DataType::Interval,
563                Some(gap_interval.into()),
564            )),
565            store.clone(),
566        )
567        .await;
568
569        tx.push_barrier(test_epoch(1), false);
570        gap_fill_executor.expect_barrier().await;
571
572        tx.push_int64_watermark(1, 0_i64);
573        tx.push_watermark(
574            0,
575            DataType::Timestamp,
576            "2023-03-06 18:27:03"
577                .parse::<risingwave_common::types::Timestamp>()
578                .unwrap()
579                .into(),
580        );
581        gap_fill_executor.expect_watermark().await;
582
583        tx.push_chunk(StreamChunk::from_pretty(
584            " TS                  i   I    f     F
585            + 2023-04-01T10:00:00 10 100 1.0 100.0
586            + 2023-04-05T10:00:00 50 200 5.0 200.0",
587        ));
588
589        tx.push_int64_watermark(1, 0_i64);
590        tx.push_watermark(
591            0,
592            DataType::Timestamp,
593            "2023-04-05 18:27:03"
594                .parse::<risingwave_common::types::Timestamp>()
595                .unwrap()
596                .into(),
597        );
598
599        let chunk = gap_fill_executor.expect_chunk().await;
600        assert_eq!(
601            chunk,
602            StreamChunk::from_pretty(
603                " TS                  i   I    f     F
604                + 2023-04-01T10:00:00 10 100 1.0 100.0
605                + 2023-04-02T10:00:00 10 100 1.0 100.0
606                + 2023-04-03T10:00:00 10 100 1.0 100.0
607                + 2023-04-04T10:00:00 10 100 1.0 100.0
608                + 2023-04-05T10:00:00 50 200 5.0 200.0",
609            )
610        );
611        gap_fill_executor.expect_watermark().await;
612    }
613
614    #[tokio::test]
615    async fn test_gap_fill_null() {
616        let time_column_index = 0;
617        let gap_interval = Interval::from_days(1);
618        let fill_columns = HashMap::from([
619            (1, FillStrategy::Null),
620            (2, FillStrategy::Null),
621            (3, FillStrategy::Null),
622            (4, FillStrategy::Null),
623        ]);
624        let store = MemoryStateStore::new();
625        let (mut tx, mut gap_fill_executor) = create_executor(
626            time_column_index,
627            fill_columns,
628            NonStrictExpression::for_test(LiteralExpression::new(
629                DataType::Interval,
630                Some(gap_interval.into()),
631            )),
632            store.clone(),
633        )
634        .await;
635
636        tx.push_barrier(test_epoch(1), false);
637        gap_fill_executor.expect_barrier().await;
638
639        tx.push_int64_watermark(1, 0_i64);
640        tx.push_watermark(
641            0,
642            DataType::Timestamp,
643            "2023-03-06 18:27:03"
644                .parse::<risingwave_common::types::Timestamp>()
645                .unwrap()
646                .into(),
647        );
648        gap_fill_executor.expect_watermark().await;
649
650        tx.push_chunk(StreamChunk::from_pretty(
651            " TS                  i   I    f     F
652            + 2023-04-01T10:00:00 10 100 1.0 100.0
653            + 2023-04-05T10:00:00 50 200 5.0 200.0",
654        ));
655
656        tx.push_int64_watermark(1, 0_i64);
657        tx.push_watermark(
658            0,
659            DataType::Timestamp,
660            "2023-04-05 18:27:03"
661                .parse::<risingwave_common::types::Timestamp>()
662                .unwrap()
663                .into(),
664        );
665
666        let chunk = gap_fill_executor.expect_chunk().await;
667        assert_eq!(
668            chunk,
669            StreamChunk::from_pretty(
670                " TS                  i   I    f     F
671                + 2023-04-01T10:00:00 10 100 1.0 100.0
672                + 2023-04-02T10:00:00 .  .    .    .
673                + 2023-04-03T10:00:00 .  .    .    .
674                + 2023-04-04T10:00:00 .  .    .    .
675                + 2023-04-05T10:00:00 50 200 5.0 200.0",
676            )
677        );
678        gap_fill_executor.expect_watermark().await;
679    }
680
681    #[tokio::test]
682    async fn test_gap_fill_mixed_strategy() {
683        let time_column_index = 0;
684        let gap_interval = Interval::from_days(1);
685        let fill_columns = HashMap::from([
686            (1, FillStrategy::Interpolate),
687            (2, FillStrategy::Locf),
688            (3, FillStrategy::Null),
689            (4, FillStrategy::Interpolate),
690        ]);
691        let store = MemoryStateStore::new();
692        let (mut tx, mut gap_fill_executor) = create_executor(
693            time_column_index,
694            fill_columns,
695            NonStrictExpression::for_test(LiteralExpression::new(
696                DataType::Interval,
697                Some(gap_interval.into()),
698            )),
699            store.clone(),
700        )
701        .await;
702
703        tx.push_barrier(test_epoch(1), false);
704        gap_fill_executor.expect_barrier().await;
705
706        tx.push_int64_watermark(1, 0_i64);
707        tx.push_watermark(
708            0,
709            DataType::Timestamp,
710            "2023-03-06 18:27:03"
711                .parse::<risingwave_common::types::Timestamp>()
712                .unwrap()
713                .into(),
714        );
715        gap_fill_executor.expect_watermark().await;
716
717        tx.push_chunk(StreamChunk::from_pretty(
718            " TS                  i   I    f     F
719            + 2023-04-01T10:00:00 10 100 1.0 100.0
720            + 2023-04-05T10:00:00 50 200 5.0 200.0",
721        ));
722
723        tx.push_int64_watermark(1, 0_i64);
724        tx.push_watermark(
725            0,
726            DataType::Timestamp,
727            "2023-04-05 18:27:03"
728                .parse::<risingwave_common::types::Timestamp>()
729                .unwrap()
730                .into(),
731        );
732
733        let chunk = gap_fill_executor.expect_chunk().await;
734        assert_eq!(
735            chunk,
736            StreamChunk::from_pretty(
737                " TS                  i   I    f     F
738                + 2023-04-01T10:00:00 10 100 1.0 100.0
739                + 2023-04-02T10:00:00 20 100 .    125.0
740                + 2023-04-03T10:00:00 30 100 .    150.0
741                + 2023-04-04T10:00:00 40 100 .    175.0
742                + 2023-04-05T10:00:00 50 200 5.0 200.0",
743            )
744        );
745        gap_fill_executor.expect_watermark().await;
746    }
747
748    #[tokio::test]
749    async fn test_gap_fill_fail_over() {
750        let time_column_index = 0;
751        let gap_interval = Interval::from_days(1);
752        let fill_columns = HashMap::from([
753            (1, FillStrategy::Locf),
754            (2, FillStrategy::Interpolate),
755            (3, FillStrategy::Locf),
756            (4, FillStrategy::Locf),
757        ]);
758        let store = MemoryStateStore::new();
759        let (mut tx, mut gap_fill_executor) = create_executor(
760            time_column_index,
761            fill_columns.clone(),
762            NonStrictExpression::for_test(LiteralExpression::new(
763                DataType::Interval,
764                Some(gap_interval.into()),
765            )),
766            store.clone(),
767        )
768        .await;
769
770        tx.push_barrier(test_epoch(1), false);
771        gap_fill_executor.expect_barrier().await;
772
773        tx.push_chunk(StreamChunk::from_pretty(
774            " TS                  i   I    f     F
775            + 2023-04-01T10:00:00 10 100 1.0 100.0
776            + 2023-04-05T10:00:00 50 200 5.0 200.0",
777        ));
778
779        tx.push_barrier(test_epoch(2), false);
780        gap_fill_executor.expect_barrier().await;
781
782        let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
783            time_column_index,
784            fill_columns.clone(),
785            NonStrictExpression::for_test(LiteralExpression::new(
786                DataType::Interval,
787                Some(gap_interval.into()),
788            )),
789            store.clone(),
790        )
791        .await;
792
793        recovered_tx.push_barrier(test_epoch(2), false);
794        recovered_gap_fill_executor.expect_barrier().await;
795
796        recovered_tx.push_watermark(
797            0,
798            DataType::Timestamp,
799            "2023-04-06T10:00:00"
800                .parse::<risingwave_common::types::Timestamp>()
801                .unwrap()
802                .into(),
803        );
804
805        let chunk = recovered_gap_fill_executor.expect_chunk().await;
806        assert_eq!(
807            chunk,
808            StreamChunk::from_pretty(
809                " TS                  i   I    f     F
810                + 2023-04-01T10:00:00 10 100 1.0 100.0
811                + 2023-04-02T10:00:00 10 125 1.0 100.0
812                + 2023-04-03T10:00:00 10 150 1.0 100.0
813                + 2023-04-04T10:00:00 10 175 1.0 100.0
814                + 2023-04-05T10:00:00 50 200 5.0 200.0"
815            )
816        );
817
818        recovered_gap_fill_executor.expect_watermark().await;
819
820        recovered_tx.push_chunk(StreamChunk::from_pretty(
821            " TS                  i   I    f     F
822            + 2023-04-08T10:00:00 80 500 8.0 500.0",
823        ));
824
825        recovered_tx.push_barrier(test_epoch(3), false);
826        recovered_gap_fill_executor.expect_barrier().await;
827
828        let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
829            time_column_index,
830            fill_columns,
831            NonStrictExpression::for_test(LiteralExpression::new(
832                DataType::Interval,
833                Some(gap_interval.into()),
834            )),
835            store,
836        )
837        .await;
838
839        final_recovered_tx.push_barrier(test_epoch(3), false);
840        final_recovered_gap_fill_executor.expect_barrier().await;
841
842        final_recovered_tx.push_watermark(
843            0,
844            DataType::Timestamp,
845            "2023-04-09T10:00:00"
846                .parse::<risingwave_common::types::Timestamp>()
847                .unwrap()
848                .into(),
849        );
850
851        let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
852        assert_eq!(
853            chunk,
854            StreamChunk::from_pretty(
855                " TS                  i   I    f     F
856                + 2023-04-06T10:00:00 50 300 5.0 200.0
857                + 2023-04-07T10:00:00 50 400 5.0 200.0
858                + 2023-04-08T10:00:00 80 500 8.0 500.0"
859            )
860        );
861
862        final_recovered_gap_fill_executor.expect_watermark().await;
863    }
864}