Skip to main content

risingwave_stream/executor/eowc/
eowc_gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::gap_fill::{
19    FillStrategy, apply_interpolation_step, calculate_interpolation_step,
20};
21use risingwave_common::metrics::LabelGuardedIntCounter;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{CheckedAdd, Interval, ToOwnedDatum};
24use risingwave_expr::ExprError;
25use risingwave_expr::expr::NonStrictExpression;
26use tracing::warn;
27
28use super::sort_buffer::SortBuffer;
29use crate::executor::prelude::*;
30
31pub struct EowcGapFillExecutor<S: StateStore> {
32    input: Executor,
33    inner: ExecutorInner<S>,
34}
35
36pub struct EowcGapFillExecutorArgs<S: StateStore> {
37    pub actor_ctx: ActorContextRef,
38
39    pub input: Executor,
40
41    pub schema: Schema,
42    pub buffer_table: StateTable<S>,
43    pub prev_row_table: StateTable<S>,
44    pub chunk_size: usize,
45    pub time_column_index: usize,
46    pub fill_columns: HashMap<usize, FillStrategy>,
47    pub gap_interval: NonStrictExpression,
48    pub high_gap_fill_amplification_threshold: usize,
49}
50
51pub struct GapFillMetrics {
52    pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
53}
54
55struct GapFillGenerationContext<'a> {
56    metrics: &'a GapFillMetrics,
57    high_amplification_threshold: usize,
58    actor_ctx: &'a ActorContextRef,
59}
60
61struct ExecutorInner<S: StateStore> {
62    actor_ctx: ActorContextRef,
63
64    schema: Schema,
65    buffer_table: StateTable<S>,
66    prev_row_table: StateTable<S>,
67    chunk_size: usize,
68    time_column_index: usize,
69    fill_columns: HashMap<usize, FillStrategy>,
70    gap_interval: NonStrictExpression,
71    high_gap_fill_amplification_threshold: usize,
72
73    // Metrics
74    metrics: GapFillMetrics,
75}
76
77struct ExecutionVars<S: StateStore> {
78    buffer: SortBuffer<S>,
79}
80
81impl<S: StateStore> ExecutorInner<S> {
82    fn generate_filled_rows(
83        prev_row: &OwnedRow,
84        curr_row: &OwnedRow,
85        time_column_index: usize,
86        fill_columns: &HashMap<usize, FillStrategy>,
87        interval: risingwave_common::types::Interval,
88        generation_context: &GapFillGenerationContext<'_>,
89    ) -> Result<Vec<OwnedRow>, ExprError> {
90        let mut filled_rows = Vec::new();
91        let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
92            prev_row.datum_at(time_column_index),
93            curr_row.datum_at(time_column_index),
94        ) else {
95            return Ok(filled_rows);
96        };
97
98        let prev_time = match prev_time_scalar {
99            ScalarRefImpl::Timestamp(ts) => ts,
100            ScalarRefImpl::Timestamptz(ts) => {
101                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
102                    Ok(timestamp) => timestamp,
103                    Err(_) => {
104                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
105                        return Ok(filled_rows);
106                    }
107                }
108            }
109            _ => {
110                warn!(
111                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
112                    prev_time_scalar
113                );
114                return Ok(filled_rows);
115            }
116        };
117
118        let curr_time = match curr_time_scalar {
119            ScalarRefImpl::Timestamp(ts) => ts,
120            ScalarRefImpl::Timestamptz(ts) => {
121                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
122                    Ok(timestamp) => timestamp,
123                    Err(_) => {
124                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
125                        return Ok(filled_rows);
126                    }
127                }
128            }
129            _ => {
130                warn!(
131                    "Failed to convert time column to timestamp, got {:?}. Skipping gap fill.",
132                    curr_time_scalar
133                );
134                return Ok(filled_rows);
135            }
136        };
137        if prev_time >= curr_time {
138            return Ok(filled_rows);
139        }
140
141        let mut fill_time = match prev_time.checked_add(interval) {
142            Some(t) => t,
143            None => {
144                return Ok(filled_rows);
145            }
146        };
147        if fill_time >= curr_time {
148            return Ok(filled_rows);
149        }
150
151        // Calculate the number of rows to fill
152        let mut row_count = 0;
153        let mut temp_time = fill_time;
154        while temp_time < curr_time {
155            row_count += 1;
156            temp_time = match temp_time.checked_add(interval) {
157                Some(t) => t,
158                None => break,
159            };
160        }
161
162        // Pre-compute interpolation steps for each column that requires interpolation
163        let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
164        let mut interpolation_states: Vec<Datum> = Vec::new();
165
166        for i in 0..prev_row.len() {
167            if let Some(strategy) = fill_columns.get(&i) {
168                if matches!(strategy, FillStrategy::Interpolate) {
169                    let step = calculate_interpolation_step(
170                        prev_row.datum_at(i),
171                        curr_row.datum_at(i),
172                        row_count + 1,
173                    );
174                    interpolation_steps.push(step.clone());
175                    interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
176                } else {
177                    interpolation_steps.push(None);
178                    interpolation_states.push(None);
179                }
180            } else {
181                interpolation_steps.push(None);
182                interpolation_states.push(None);
183            }
184        }
185
186        // Generate filled rows, applying the appropriate strategy for each column
187        while fill_time < curr_time {
188            let mut new_row_data = Vec::with_capacity(prev_row.len());
189
190            for col_idx in 0..prev_row.len() {
191                let datum = if col_idx == time_column_index {
192                    // Time column: use the incremented timestamp
193                    let fill_time_scalar = match prev_time_scalar {
194                        ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
195                        ScalarRefImpl::Timestamptz(_) => {
196                            let micros = fill_time.0.and_utc().timestamp_micros();
197                            ScalarImpl::Timestamptz(
198                                risingwave_common::types::Timestamptz::from_micros(micros),
199                            )
200                        }
201                        _ => unreachable!("Time column should be Timestamp or Timestamptz"),
202                    };
203                    Some(fill_time_scalar)
204                } else if let Some(strategy) = fill_columns.get(&col_idx) {
205                    // Apply the fill strategy for this column
206                    match strategy {
207                        FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
208                        FillStrategy::Null => None,
209                        FillStrategy::Interpolate => {
210                            // Apply interpolation step and update cumulative value
211                            if let Some(step) = &interpolation_steps[col_idx] {
212                                apply_interpolation_step(&mut interpolation_states[col_idx], step);
213                                interpolation_states[col_idx].clone()
214                            } else {
215                                // If interpolation step is None, fill with NULL
216                                None
217                            }
218                        }
219                    }
220                } else {
221                    // No strategy specified, default to NULL
222                    None
223                };
224                new_row_data.push(datum);
225            }
226
227            filled_rows.push(OwnedRow::new(new_row_data));
228
229            fill_time = match fill_time.checked_add(interval) {
230                Some(t) => t,
231                None => {
232                    // Time overflow during iteration, stop filling
233                    warn!(
234                        "Gap fill stopped due to timestamp overflow after generating {} rows.",
235                        filled_rows.len()
236                    );
237                    break;
238                }
239            };
240        }
241
242        // Update metrics with the number of generated rows
243        generation_context
244            .metrics
245            .gap_fill_generated_rows_count
246            .inc_by(filled_rows.len() as u64);
247
248        if filled_rows.len() > generation_context.high_amplification_threshold {
249            tracing::warn!(target: "high_gap_fill_amplification",
250                generated_rows_len = filled_rows.len(),
251                prev_time = ?prev_time,
252                curr_time = ?curr_time,
253                gap_interval = ?interval,
254                actor_id = %generation_context.actor_ctx.id,
255                fragment_id = %generation_context.actor_ctx.fragment_id,
256                "large rows generated by gap fill"
257            );
258        }
259
260        Ok(filled_rows)
261    }
262}
263
264impl<S: StateStore> Execute for EowcGapFillExecutor<S> {
265    fn execute(self: Box<Self>) -> BoxedMessageStream {
266        self.execute_inner().boxed()
267    }
268}
269
270impl<S: StateStore> EowcGapFillExecutor<S> {
271    pub fn new(args: EowcGapFillExecutorArgs<S>) -> Self {
272        let metrics = args.actor_ctx.streaming_metrics.clone();
273        let actor_id = args.actor_ctx.id.to_string();
274        let fragment_id = args.actor_ctx.fragment_id.to_string();
275        let gap_fill_metrics = GapFillMetrics {
276            gap_fill_generated_rows_count: metrics
277                .gap_fill_generated_rows_count
278                .with_guarded_label_values(&[&actor_id, &fragment_id]),
279        };
280
281        Self {
282            input: args.input,
283
284            inner: ExecutorInner {
285                actor_ctx: args.actor_ctx,
286                schema: args.schema,
287                buffer_table: args.buffer_table,
288                prev_row_table: args.prev_row_table,
289                chunk_size: args.chunk_size,
290                time_column_index: args.time_column_index,
291                fill_columns: args.fill_columns,
292                gap_interval: args.gap_interval,
293                high_gap_fill_amplification_threshold: args.high_gap_fill_amplification_threshold,
294                metrics: gap_fill_metrics,
295            },
296        }
297    }
298
299    #[try_stream(ok = Message, error = StreamExecutorError)]
300    async fn execute_inner(self) {
301        let Self {
302            input,
303            inner: mut this,
304        } = self;
305
306        let mut input = input.execute();
307
308        let barrier = expect_first_barrier(&mut input).await?;
309        let first_epoch = barrier.epoch;
310        yield Message::Barrier(barrier);
311        this.buffer_table.init_epoch(first_epoch).await?;
312        this.prev_row_table.init_epoch(first_epoch).await?;
313
314        // Calculate and validate gap interval once at initialization
315        let dummy_row = OwnedRow::new(vec![]);
316        let interval_datum = this.gap_interval.eval_row_infallible(&dummy_row).await;
317        let interval = interval_datum
318            .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
319            .into_interval();
320
321        // Validate that gap interval is positive.
322        if interval <= Interval::from_month_day_usec(0, 0, 0) {
323            Err(anyhow::anyhow!("Gap interval must be positive"))?;
324        }
325
326        let mut vars = ExecutionVars {
327            buffer: SortBuffer::new(this.time_column_index, &this.buffer_table),
328        };
329        let mut committed_prev_row: Option<OwnedRow> =
330            this.prev_row_table.get_from_one_row_table().await?;
331        let mut staging_prev_row = committed_prev_row.clone();
332
333        vars.buffer.refill_cache(None, &this.buffer_table).await?;
334
335        #[for_await]
336        for msg in input {
337            match msg? {
338                Message::Watermark(watermark @ Watermark { col_idx, .. })
339                    if col_idx == this.time_column_index =>
340                {
341                    let mut chunk_builder =
342                        StreamChunkBuilder::new(this.chunk_size, this.schema.data_types());
343
344                    #[for_await]
345                    for row in vars
346                        .buffer
347                        .consume(watermark.val.clone(), &mut this.buffer_table)
348                    {
349                        let current_row = row?;
350                        if let Some(p_row) = &staging_prev_row {
351                            let generation_context = GapFillGenerationContext {
352                                metrics: &this.metrics,
353                                high_amplification_threshold: this
354                                    .high_gap_fill_amplification_threshold,
355                                actor_ctx: &this.actor_ctx,
356                            };
357                            let filled_rows = ExecutorInner::<S>::generate_filled_rows(
358                                p_row,
359                                &current_row,
360                                this.time_column_index,
361                                &this.fill_columns,
362                                interval,
363                                &generation_context,
364                            )?;
365                            for filled_row in filled_rows {
366                                if let Some(chunk) =
367                                    chunk_builder.append_row(Op::Insert, &filled_row)
368                                {
369                                    yield Message::Chunk(chunk);
370                                }
371                            }
372                        }
373                        if let Some(chunk) = chunk_builder.append_row(Op::Insert, &current_row) {
374                            yield Message::Chunk(chunk);
375                        }
376                        staging_prev_row = Some(current_row);
377                    }
378                    if let Some(chunk) = chunk_builder.take() {
379                        yield Message::Chunk(chunk);
380                    }
381
382                    yield Message::Watermark(watermark);
383                }
384                Message::Watermark(_) => continue,
385                Message::Chunk(chunk) => {
386                    vars.buffer.apply_chunk(chunk, &mut this.buffer_table);
387                    this.buffer_table.try_flush().await?;
388                }
389                Message::Barrier(barrier) => {
390                    if committed_prev_row != staging_prev_row {
391                        if let Some(old_row) = &committed_prev_row {
392                            this.prev_row_table.delete(old_row);
393                        }
394                        if let Some(new_row) = &staging_prev_row {
395                            this.prev_row_table.insert(new_row);
396                        }
397                    }
398
399                    let post_commit = this.buffer_table.commit(barrier.epoch).await?;
400                    this.prev_row_table
401                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
402                        .await?;
403
404                    committed_prev_row.clone_from(&staging_prev_row);
405
406                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(this.actor_ctx.id);
407                    yield Message::Barrier(barrier);
408
409                    if post_commit
410                        .post_yield_barrier(update_vnode_bitmap)
411                        .await?
412                        .is_some()
413                    {
414                        // `SortBuffer` may output data directly from its in-memory cache without
415                        // checking current vnode ownership. Therefore, we must rebuild the cache
416                        // whenever the vnode bitmap is updated to avoid emitting rows that no
417                        // longer belong to this actor.
418                        vars.buffer.refill_cache(None, &this.buffer_table).await?;
419                    }
420                }
421            }
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
429    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
430    use risingwave_common::types::Interval;
431    use risingwave_common::types::test_utils::IntervalTestExt;
432    use risingwave_common::util::epoch::test_epoch;
433    use risingwave_common::util::sort_util::OrderType;
434    use risingwave_expr::expr::LiteralExpression;
435    use risingwave_storage::memory::MemoryStateStore;
436
437    use super::*;
438    use crate::common::table::test_utils::gen_pbtable_with_dist_key;
439    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
440
441    async fn create_executor<S: StateStore>(
442        time_column_index: usize,
443        fill_columns: HashMap<usize, FillStrategy>,
444        gap_interval: NonStrictExpression,
445        store: S,
446    ) -> (MessageSender, BoxedMessageStream) {
447        let input_schema = Schema::new(vec![
448            Field::unnamed(DataType::Timestamp),
449            Field::unnamed(DataType::Int32),
450            Field::unnamed(DataType::Int64),
451            Field::unnamed(DataType::Float32),
452            Field::unnamed(DataType::Float64),
453        ]);
454        let input_stream_key = vec![time_column_index];
455
456        let table_columns = vec![
457            ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp),
458            ColumnDesc::unnamed(ColumnId::new(1), DataType::Int32),
459            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
460            ColumnDesc::unnamed(ColumnId::new(3), DataType::Float32),
461            ColumnDesc::unnamed(ColumnId::new(4), DataType::Float64),
462        ];
463
464        let table_pk_indices = vec![time_column_index];
465        let table_order_types = vec![OrderType::ascending()];
466        let buffer_table = StateTable::from_table_catalog(
467            &gen_pbtable_with_dist_key(
468                TableId::new(0),
469                table_columns.clone(),
470                table_order_types,
471                table_pk_indices,
472                0,
473                vec![],
474            ),
475            store.clone(),
476            None,
477        )
478        .await;
479
480        let prev_row_pk_indices = vec![0];
481        let prev_row_order_types = vec![OrderType::ascending()];
482        let prev_row_table = StateTable::from_table_catalog(
483            &gen_pbtable_with_dist_key(
484                TableId::new(1),
485                table_columns,
486                prev_row_order_types,
487                prev_row_pk_indices,
488                0,
489                vec![],
490            ),
491            store,
492            None,
493        )
494        .await;
495
496        let (tx, source) = MockSource::channel();
497        let source = source.into_executor(input_schema, input_stream_key);
498        let gap_fill_executor = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
499            actor_ctx: ActorContext::for_test(123),
500            schema: source.schema().clone(),
501            input: source,
502            buffer_table,
503            prev_row_table,
504            chunk_size: 1024,
505            time_column_index,
506            fill_columns,
507            gap_interval,
508            high_gap_fill_amplification_threshold: 2048,
509        });
510
511        (tx, gap_fill_executor.boxed().execute())
512    }
513
514    #[tokio::test]
515    async fn test_gap_fill_interpolate() {
516        let time_column_index = 0;
517        let gap_interval = Interval::from_days(1);
518        let fill_columns = HashMap::from([
519            (1, FillStrategy::Interpolate),
520            (2, FillStrategy::Interpolate),
521            (3, FillStrategy::Interpolate),
522            (4, FillStrategy::Interpolate),
523        ]);
524        let store = MemoryStateStore::new();
525        let (mut tx, mut gap_fill_executor) = create_executor(
526            time_column_index,
527            fill_columns,
528            NonStrictExpression::for_test(LiteralExpression::new(
529                DataType::Interval,
530                Some(gap_interval.into()),
531            )),
532            store.clone(),
533        )
534        .await;
535
536        tx.push_barrier(test_epoch(1), false);
537        gap_fill_executor.expect_barrier().await;
538
539        tx.push_int64_watermark(1, 0_i64);
540        tx.push_watermark(
541            0,
542            DataType::Timestamp,
543            "2023-03-06 18:27:03"
544                .parse::<risingwave_common::types::Timestamp>()
545                .unwrap()
546                .into(),
547        );
548        gap_fill_executor.expect_watermark().await;
549
550        tx.push_chunk(StreamChunk::from_pretty(
551            " TS                  i   I    f     F
552            + 2023-04-01T10:00:00 10 100 1.0 100.0
553            + 2023-04-05T10:00:00 50 200 5.0 200.0",
554        ));
555
556        tx.push_int64_watermark(1, 0_i64);
557        tx.push_watermark(
558            0,
559            DataType::Timestamp,
560            "2023-04-05 18:27:03"
561                .parse::<risingwave_common::types::Timestamp>()
562                .unwrap()
563                .into(),
564        );
565
566        let chunk = gap_fill_executor.expect_chunk().await;
567        assert_eq!(
568            chunk,
569            StreamChunk::from_pretty(
570                " TS                  i   I    f     F
571                + 2023-04-01T10:00:00 10 100 1.0 100.0
572                + 2023-04-02T10:00:00 20 125 2.0 125.0
573                + 2023-04-03T10:00:00 30 150 3.0 150.0
574                + 2023-04-04T10:00:00 40 175 4.0 175.0
575                + 2023-04-05T10:00:00 50 200 5.0 200.0",
576            )
577        );
578        gap_fill_executor.expect_watermark().await;
579    }
580
581    #[tokio::test]
582    async fn test_gap_fill_locf() {
583        let time_column_index = 0;
584        let gap_interval = Interval::from_days(1);
585        let fill_columns = HashMap::from([
586            (1, FillStrategy::Locf),
587            (2, FillStrategy::Locf),
588            (3, FillStrategy::Locf),
589            (4, FillStrategy::Locf),
590        ]);
591        let store = MemoryStateStore::new();
592        let (mut tx, mut gap_fill_executor) = create_executor(
593            time_column_index,
594            fill_columns,
595            NonStrictExpression::for_test(LiteralExpression::new(
596                DataType::Interval,
597                Some(gap_interval.into()),
598            )),
599            store.clone(),
600        )
601        .await;
602
603        tx.push_barrier(test_epoch(1), false);
604        gap_fill_executor.expect_barrier().await;
605
606        tx.push_int64_watermark(1, 0_i64);
607        tx.push_watermark(
608            0,
609            DataType::Timestamp,
610            "2023-03-06 18:27:03"
611                .parse::<risingwave_common::types::Timestamp>()
612                .unwrap()
613                .into(),
614        );
615        gap_fill_executor.expect_watermark().await;
616
617        tx.push_chunk(StreamChunk::from_pretty(
618            " TS                  i   I    f     F
619            + 2023-04-01T10:00:00 10 100 1.0 100.0
620            + 2023-04-05T10:00:00 50 200 5.0 200.0",
621        ));
622
623        tx.push_int64_watermark(1, 0_i64);
624        tx.push_watermark(
625            0,
626            DataType::Timestamp,
627            "2023-04-05 18:27:03"
628                .parse::<risingwave_common::types::Timestamp>()
629                .unwrap()
630                .into(),
631        );
632
633        let chunk = gap_fill_executor.expect_chunk().await;
634        assert_eq!(
635            chunk,
636            StreamChunk::from_pretty(
637                " TS                  i   I    f     F
638                + 2023-04-01T10:00:00 10 100 1.0 100.0
639                + 2023-04-02T10:00:00 10 100 1.0 100.0
640                + 2023-04-03T10:00:00 10 100 1.0 100.0
641                + 2023-04-04T10:00:00 10 100 1.0 100.0
642                + 2023-04-05T10:00:00 50 200 5.0 200.0",
643            )
644        );
645        gap_fill_executor.expect_watermark().await;
646    }
647
648    #[tokio::test]
649    async fn test_gap_fill_null() {
650        let time_column_index = 0;
651        let gap_interval = Interval::from_days(1);
652        let fill_columns = HashMap::from([
653            (1, FillStrategy::Null),
654            (2, FillStrategy::Null),
655            (3, FillStrategy::Null),
656            (4, FillStrategy::Null),
657        ]);
658        let store = MemoryStateStore::new();
659        let (mut tx, mut gap_fill_executor) = create_executor(
660            time_column_index,
661            fill_columns,
662            NonStrictExpression::for_test(LiteralExpression::new(
663                DataType::Interval,
664                Some(gap_interval.into()),
665            )),
666            store.clone(),
667        )
668        .await;
669
670        tx.push_barrier(test_epoch(1), false);
671        gap_fill_executor.expect_barrier().await;
672
673        tx.push_int64_watermark(1, 0_i64);
674        tx.push_watermark(
675            0,
676            DataType::Timestamp,
677            "2023-03-06 18:27:03"
678                .parse::<risingwave_common::types::Timestamp>()
679                .unwrap()
680                .into(),
681        );
682        gap_fill_executor.expect_watermark().await;
683
684        tx.push_chunk(StreamChunk::from_pretty(
685            " TS                  i   I    f     F
686            + 2023-04-01T10:00:00 10 100 1.0 100.0
687            + 2023-04-05T10:00:00 50 200 5.0 200.0",
688        ));
689
690        tx.push_int64_watermark(1, 0_i64);
691        tx.push_watermark(
692            0,
693            DataType::Timestamp,
694            "2023-04-05 18:27:03"
695                .parse::<risingwave_common::types::Timestamp>()
696                .unwrap()
697                .into(),
698        );
699
700        let chunk = gap_fill_executor.expect_chunk().await;
701        assert_eq!(
702            chunk,
703            StreamChunk::from_pretty(
704                " TS                  i   I    f     F
705                + 2023-04-01T10:00:00 10 100 1.0 100.0
706                + 2023-04-02T10:00:00 .  .    .    .
707                + 2023-04-03T10:00:00 .  .    .    .
708                + 2023-04-04T10:00:00 .  .    .    .
709                + 2023-04-05T10:00:00 50 200 5.0 200.0",
710            )
711        );
712        gap_fill_executor.expect_watermark().await;
713    }
714
715    #[tokio::test]
716    async fn test_gap_fill_mixed_strategy() {
717        let time_column_index = 0;
718        let gap_interval = Interval::from_days(1);
719        let fill_columns = HashMap::from([
720            (1, FillStrategy::Interpolate),
721            (2, FillStrategy::Locf),
722            (3, FillStrategy::Null),
723            (4, FillStrategy::Interpolate),
724        ]);
725        let store = MemoryStateStore::new();
726        let (mut tx, mut gap_fill_executor) = create_executor(
727            time_column_index,
728            fill_columns,
729            NonStrictExpression::for_test(LiteralExpression::new(
730                DataType::Interval,
731                Some(gap_interval.into()),
732            )),
733            store.clone(),
734        )
735        .await;
736
737        tx.push_barrier(test_epoch(1), false);
738        gap_fill_executor.expect_barrier().await;
739
740        tx.push_int64_watermark(1, 0_i64);
741        tx.push_watermark(
742            0,
743            DataType::Timestamp,
744            "2023-03-06 18:27:03"
745                .parse::<risingwave_common::types::Timestamp>()
746                .unwrap()
747                .into(),
748        );
749        gap_fill_executor.expect_watermark().await;
750
751        tx.push_chunk(StreamChunk::from_pretty(
752            " TS                  i   I    f     F
753            + 2023-04-01T10:00:00 10 100 1.0 100.0
754            + 2023-04-05T10:00:00 50 200 5.0 200.0",
755        ));
756
757        tx.push_int64_watermark(1, 0_i64);
758        tx.push_watermark(
759            0,
760            DataType::Timestamp,
761            "2023-04-05 18:27:03"
762                .parse::<risingwave_common::types::Timestamp>()
763                .unwrap()
764                .into(),
765        );
766
767        let chunk = gap_fill_executor.expect_chunk().await;
768        assert_eq!(
769            chunk,
770            StreamChunk::from_pretty(
771                " TS                  i   I    f     F
772                + 2023-04-01T10:00:00 10 100 1.0 100.0
773                + 2023-04-02T10:00:00 20 100 .    125.0
774                + 2023-04-03T10:00:00 30 100 .    150.0
775                + 2023-04-04T10:00:00 40 100 .    175.0
776                + 2023-04-05T10:00:00 50 200 5.0 200.0",
777            )
778        );
779        gap_fill_executor.expect_watermark().await;
780    }
781
782    #[tokio::test]
783    async fn test_gap_fill_fail_over() {
784        let time_column_index = 0;
785        let gap_interval = Interval::from_days(1);
786        let fill_columns = HashMap::from([
787            (1, FillStrategy::Locf),
788            (2, FillStrategy::Interpolate),
789            (3, FillStrategy::Locf),
790            (4, FillStrategy::Locf),
791        ]);
792        let store = MemoryStateStore::new();
793        let (mut tx, mut gap_fill_executor) = create_executor(
794            time_column_index,
795            fill_columns.clone(),
796            NonStrictExpression::for_test(LiteralExpression::new(
797                DataType::Interval,
798                Some(gap_interval.into()),
799            )),
800            store.clone(),
801        )
802        .await;
803
804        tx.push_barrier(test_epoch(1), false);
805        gap_fill_executor.expect_barrier().await;
806
807        tx.push_chunk(StreamChunk::from_pretty(
808            " TS                  i   I    f     F
809            + 2023-04-01T10:00:00 10 100 1.0 100.0
810            + 2023-04-05T10:00:00 50 200 5.0 200.0",
811        ));
812
813        tx.push_barrier(test_epoch(2), false);
814        gap_fill_executor.expect_barrier().await;
815
816        let (mut recovered_tx, mut recovered_gap_fill_executor) = create_executor(
817            time_column_index,
818            fill_columns.clone(),
819            NonStrictExpression::for_test(LiteralExpression::new(
820                DataType::Interval,
821                Some(gap_interval.into()),
822            )),
823            store.clone(),
824        )
825        .await;
826
827        recovered_tx.push_barrier(test_epoch(2), false);
828        recovered_gap_fill_executor.expect_barrier().await;
829
830        recovered_tx.push_watermark(
831            0,
832            DataType::Timestamp,
833            "2023-04-06T10:00:00"
834                .parse::<risingwave_common::types::Timestamp>()
835                .unwrap()
836                .into(),
837        );
838
839        let chunk = recovered_gap_fill_executor.expect_chunk().await;
840        assert_eq!(
841            chunk,
842            StreamChunk::from_pretty(
843                " TS                  i   I    f     F
844                + 2023-04-01T10:00:00 10 100 1.0 100.0
845                + 2023-04-02T10:00:00 10 125 1.0 100.0
846                + 2023-04-03T10:00:00 10 150 1.0 100.0
847                + 2023-04-04T10:00:00 10 175 1.0 100.0
848                + 2023-04-05T10:00:00 50 200 5.0 200.0"
849            )
850        );
851
852        recovered_gap_fill_executor.expect_watermark().await;
853
854        recovered_tx.push_chunk(StreamChunk::from_pretty(
855            " TS                  i   I    f     F
856            + 2023-04-08T10:00:00 80 500 8.0 500.0",
857        ));
858
859        recovered_tx.push_barrier(test_epoch(3), false);
860        recovered_gap_fill_executor.expect_barrier().await;
861
862        let (mut final_recovered_tx, mut final_recovered_gap_fill_executor) = create_executor(
863            time_column_index,
864            fill_columns,
865            NonStrictExpression::for_test(LiteralExpression::new(
866                DataType::Interval,
867                Some(gap_interval.into()),
868            )),
869            store,
870        )
871        .await;
872
873        final_recovered_tx.push_barrier(test_epoch(3), false);
874        final_recovered_gap_fill_executor.expect_barrier().await;
875
876        final_recovered_tx.push_watermark(
877            0,
878            DataType::Timestamp,
879            "2023-04-09T10:00:00"
880                .parse::<risingwave_common::types::Timestamp>()
881                .unwrap()
882                .into(),
883        );
884
885        let chunk = final_recovered_gap_fill_executor.expect_chunk().await;
886        assert_eq!(
887            chunk,
888            StreamChunk::from_pretty(
889                " TS                  i   I    f     F
890                + 2023-04-06T10:00:00 50 300 5.0 200.0
891                + 2023-04-07T10:00:00 50 400 5.0 200.0
892                + 2023-04-08T10:00:00 80 500 8.0 500.0"
893            )
894        );
895
896        final_recovered_gap_fill_executor.expect_watermark().await;
897    }
898}