Skip to main content

risingwave_stream/executor/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::ops::Bound;
18
19use futures::{StreamExt, pin_mut};
20use risingwave_common::array::Op;
21use risingwave_common::gap_fill::{
22    FillStrategy, apply_interpolation_step, calculate_interpolation_step,
23};
24use risingwave_common::metrics::LabelGuardedIntCounter;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::types::{
27    CheckedAdd, Datum, DefaultOrd, Interval, ScalarImpl, Timestamp, ToOwnedDatum,
28};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33use tracing::warn;
34
35use crate::common::table::state_table::{StateTable, StateTablePostCommit};
36use crate::executor::prelude::*;
37
38pub struct GapFillExecutorArgs<S: StateStore> {
39    pub ctx: ActorContextRef,
40    pub input: Executor,
41    pub schema: Schema,
42    pub chunk_size: usize,
43    pub time_column_index: usize,
44    pub fill_columns: HashMap<usize, FillStrategy>,
45    pub gap_interval: NonStrictExpression,
46    pub state_table: StateTable<S>,
47    pub partition_by_indices: Vec<usize>,
48    pub pointer_key_indices: Vec<usize>,
49    pub high_gap_fill_amplification_threshold: usize,
50}
51
52/// Only original (anchor) rows are persisted. Filled rows are computed on the fly.
53///
54/// State rows have the same layout as output rows. Neighbor lookups use the state table PK prefix:
55/// `(partition_cols..., time_col, upstream stream key columns excluding partition/time)`.
56pub struct ManagedGapFillState<S: StateStore> {
57    state_table: StateTable<S>,
58    partition_by_indices: Vec<usize>,
59    pointer_key_indices: Vec<usize>,
60}
61
62impl<S: StateStore> ManagedGapFillState<S> {
63    pub fn new(
64        state_table: StateTable<S>,
65        _schema: &Schema,
66        partition_by_indices: Vec<usize>,
67        pointer_key_indices: Vec<usize>,
68    ) -> Self {
69        assert!(
70            !pointer_key_indices.is_empty(),
71            "gap fill pointer key should not be empty",
72        );
73
74        Self {
75            state_table,
76            partition_by_indices,
77            pointer_key_indices,
78        }
79    }
80
81    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
82        self.state_table.init_epoch(epoch).await
83    }
84
85    pub fn insert(&mut self, value: impl Row) {
86        self.state_table.insert(value);
87    }
88
89    pub fn delete(&mut self, value: impl Row) {
90        self.state_table.delete(value);
91    }
92
93    pub async fn flush(
94        &mut self,
95        epoch: EpochPair,
96    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
97        self.state_table.commit(epoch).await
98    }
99
100    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
101        self.state_table.try_flush().await
102    }
103
104    fn state_row_to_output_row(&self, state_row: impl Row) -> OwnedRow {
105        state_row.into_owned_row()
106    }
107
108    /// Find the previous neighbor within the same partition by scanning backward.
109    /// Uses the partition as prefix and scans rows with pointer key < target pointer key.
110    async fn find_prev_in_partition(
111        &self,
112        partition_key: impl Row,
113        target_pointer_key: impl Row,
114    ) -> StreamExecutorResult<Option<OwnedRow>> {
115        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
116            Bound::Unbounded,
117            Bound::Excluded(target_pointer_key.into_owned_row()),
118        );
119
120        let iter = self
121            .state_table
122            .rev_iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
123            .await?;
124        pin_mut!(iter);
125
126        if let Some(item) = iter.next().await {
127            let state_row = item?.into_owned_row();
128            Ok(Some(state_row))
129        } else {
130            Ok(None)
131        }
132    }
133
134    /// Find the next neighbor within the same partition by scanning forward.
135    async fn find_next_in_partition(
136        &self,
137        partition_key: impl Row,
138        target_pointer_key: impl Row,
139    ) -> StreamExecutorResult<Option<OwnedRow>> {
140        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
141            Bound::Excluded(target_pointer_key.into_owned_row()),
142            Bound::Unbounded,
143        );
144
145        let iter = self
146            .state_table
147            .iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
148            .await?;
149        pin_mut!(iter);
150
151        if let Some(item) = iter.next().await {
152            let state_row = item?.into_owned_row();
153            Ok(Some(state_row))
154        } else {
155            Ok(None)
156        }
157    }
158}
159
160pub struct GapFillExecutor<S: StateStore> {
161    ctx: ActorContextRef,
162    input: Executor,
163    schema: Schema,
164    chunk_size: usize,
165    time_column_index: usize,
166    fill_columns: HashMap<usize, FillStrategy>,
167    gap_interval: NonStrictExpression,
168    high_gap_fill_amplification_threshold: usize,
169
170    // State management
171    managed_state: ManagedGapFillState<S>,
172
173    // Metrics
174    metrics: GapFillMetrics,
175}
176
177pub struct GapFillMetrics {
178    pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
179}
180
181struct GapFillGenerationContext<'a> {
182    metrics: &'a GapFillMetrics,
183    high_amplification_threshold: usize,
184    actor_ctx: &'a ActorContextRef,
185}
186
187/// Extract the `Timestamp` the fill grid walks in from a time-column scalar; `None` if not a timestamp.
188fn time_scalar_to_timestamp(scalar: ScalarRefImpl<'_>) -> Option<Timestamp> {
189    match scalar {
190        ScalarRefImpl::Timestamp(ts) => Some(ts),
191        ScalarRefImpl::Timestamptz(ts) => Timestamp::with_micros(ts.timestamp_micros()).ok(),
192        _ => None,
193    }
194}
195
196impl<S: StateStore> GapFillExecutor<S> {
197    async fn find_prev_output(
198        managed_state: &ManagedGapFillState<S>,
199        partition_key: impl Row,
200        pointer_key: impl Row,
201    ) -> StreamExecutorResult<Option<OwnedRow>> {
202        Ok(managed_state
203            .find_prev_in_partition(partition_key, pointer_key)
204            .await?
205            .map(|sr| managed_state.state_row_to_output_row(sr)))
206    }
207
208    async fn find_next_output(
209        managed_state: &ManagedGapFillState<S>,
210        partition_key: impl Row,
211        pointer_key: impl Row,
212    ) -> StreamExecutorResult<Option<OwnedRow>> {
213        Ok(managed_state
214            .find_next_in_partition(partition_key, pointer_key)
215            .await?
216            .map(|sr| managed_state.state_row_to_output_row(sr)))
217    }
218
219    pub fn new(args: GapFillExecutorArgs<S>) -> Self {
220        let managed_state = ManagedGapFillState::new(
221            args.state_table,
222            &args.schema,
223            args.partition_by_indices,
224            args.pointer_key_indices,
225        );
226
227        let metrics = args.ctx.streaming_metrics.clone();
228        let actor_id = args.ctx.id.to_string();
229        let fragment_id = args.ctx.fragment_id.to_string();
230        let gap_fill_metrics = GapFillMetrics {
231            gap_fill_generated_rows_count: metrics
232                .gap_fill_generated_rows_count
233                .with_guarded_label_values(&[&actor_id, &fragment_id]),
234        };
235
236        Self {
237            ctx: args.ctx,
238            input: args.input,
239            schema: args.schema,
240            chunk_size: args.chunk_size,
241            time_column_index: args.time_column_index,
242            fill_columns: args.fill_columns,
243            gap_interval: args.gap_interval,
244            high_gap_fill_amplification_threshold: args.high_gap_fill_amplification_threshold,
245            managed_state,
246            metrics: gap_fill_metrics,
247        }
248    }
249
250    /// Generates interpolated rows between two time points (`prev_row` and `curr_row`) using a static interval.
251    ///
252    /// # Parameters
253    /// - `prev_row`: Reference to the previous row (start of the gap).
254    /// - `curr_row`: Reference to the current row (end of the gap).
255    /// - `interval`: The interval to use for generating each filled row (typically a time interval).
256    /// - `time_column_index`: The index of the time column in the row, used to increment time values.
257    /// - `fill_columns`: A `HashMap` mapping column indices to their respective `FillStrategy`.
258    /// - `metrics`: Metrics for tracking the number of generated rows.
259    ///
260    /// # Fill Strategy Application
261    /// For each filled row, the function applies the specified `FillStrategy` for each column:
262    /// - `FillStrategy::Locf`: Carries the previous row's value forward.
263    /// - `FillStrategy::Interpolate`: Interpolates linearly between the previous and current row values.
264    /// - `FillStrategy::Null`: Leaves the column null.
265    ///
266    /// Returns a vector of `OwnedRow` representing the filled rows between `prev_row` and `curr_row`.
267    #[expect(clippy::too_many_arguments)]
268    fn generate_filled_rows_between_static(
269        prev_row: &OwnedRow,
270        curr_row: &OwnedRow,
271        interval: &risingwave_common::types::Interval,
272        time_column_index: usize,
273        partition_by_indices: &[usize],
274        fill_columns: &HashMap<usize, FillStrategy>,
275        generation_context: &GapFillGenerationContext<'_>,
276        // Skip building fill rows below this time. Only set for LOCF/NULL (whose values don't
277        // depend on the skipped grid positions); `None` builds the whole gap.
278        build_from: Option<Timestamp>,
279    ) -> StreamExecutorResult<Vec<OwnedRow>> {
280        // Skipping rows below `build_from` would desync the cumulative interpolation state, so
281        // callers must never set it when a column interpolates.
282        debug_assert!(
283            build_from.is_none()
284                || !fill_columns
285                    .values()
286                    .any(|s| matches!(s, FillStrategy::Interpolate)),
287            "build_from must not be set when any column interpolates"
288        );
289        let mut filled_rows = Vec::new();
290
291        let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
292            prev_row.datum_at(time_column_index),
293            curr_row.datum_at(time_column_index),
294        ) else {
295            return Ok(filled_rows);
296        };
297
298        let Some(prev_time) = time_scalar_to_timestamp(prev_time_scalar) else {
299            warn!(
300                "Time column is not a timestamp value: {:?}",
301                prev_time_scalar
302            );
303            return Ok(filled_rows);
304        };
305        let Some(curr_time) = time_scalar_to_timestamp(curr_time_scalar) else {
306            warn!(
307                "Time column is not a timestamp value: {:?}",
308                curr_time_scalar
309            );
310            return Ok(filled_rows);
311        };
312
313        if prev_time >= curr_time {
314            return Ok(filled_rows);
315        }
316
317        // Calculate the number of rows to be generated and validate
318        let mut fill_time = match prev_time.checked_add(*interval) {
319            Some(t) => t,
320            None => {
321                // If the interval is so large that adding it to prev_time causes overflow,
322                // it means we shouldn't do gap fill at all.
323                warn!(
324                    "Gap fill interval is too large, causing timestamp overflow. \
325                     No gap filling will be performed between {:?} and {:?}.",
326                    prev_time, curr_time
327                );
328                return Ok(filled_rows);
329            }
330        };
331
332        // Check if fill_time is already >= curr_time, which means no gap to fill
333        if fill_time >= curr_time {
334            return Ok(filled_rows);
335        }
336
337        // Count the number of rows to generate
338        let mut row_count = 0;
339        let mut temp_time = fill_time;
340        while temp_time < curr_time {
341            row_count += 1;
342            temp_time = match temp_time.checked_add(*interval) {
343                Some(t) => t,
344                None => break,
345            };
346        }
347
348        // Pre-compute interpolation steps for each column that requires interpolation
349        let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
350        let mut interpolation_states: Vec<Datum> = Vec::new();
351
352        for i in 0..prev_row.len() {
353            if let Some(strategy) = fill_columns.get(&i) {
354                if matches!(strategy, FillStrategy::Interpolate) {
355                    let step = calculate_interpolation_step(
356                        prev_row.datum_at(i),
357                        curr_row.datum_at(i),
358                        row_count + 1,
359                    );
360                    interpolation_steps.push(step.clone());
361                    interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
362                } else {
363                    interpolation_steps.push(None);
364                    interpolation_states.push(None);
365                }
366            } else {
367                interpolation_steps.push(None);
368                interpolation_states.push(None);
369            }
370        }
371
372        // Generate filled rows, applying the appropriate strategy for each column
373        while fill_time < curr_time {
374            if build_from.is_some_and(|from| fill_time < from) {
375                fill_time = match fill_time.checked_add(*interval) {
376                    Some(t) => t,
377                    None => break,
378                };
379                continue;
380            }
381            let mut new_row_data = Vec::with_capacity(prev_row.len());
382
383            for col_idx in 0..prev_row.len() {
384                let datum = if col_idx == time_column_index {
385                    // Time column: use the incremented timestamp
386                    let fill_time_scalar = match prev_time_scalar {
387                        ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
388                        ScalarRefImpl::Timestamptz(_) => {
389                            let micros = fill_time.0.and_utc().timestamp_micros();
390                            ScalarImpl::Timestamptz(
391                                risingwave_common::types::Timestamptz::from_micros(micros),
392                            )
393                        }
394                        _ => unreachable!("Time column should be Timestamp or Timestamptz"),
395                    };
396                    Some(fill_time_scalar)
397                } else if partition_by_indices.contains(&col_idx) {
398                    // Gap-filled rows must stay in the same partition as the surrounding anchors.
399                    prev_row.datum_at(col_idx).to_owned_datum()
400                } else if let Some(strategy) = fill_columns.get(&col_idx) {
401                    // Apply the fill strategy for this column
402                    match strategy {
403                        FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
404                        FillStrategy::Null => None,
405                        FillStrategy::Interpolate => {
406                            // Apply interpolation step and update cumulative value
407                            if let Some(step) = &interpolation_steps[col_idx] {
408                                apply_interpolation_step(&mut interpolation_states[col_idx], step);
409                                interpolation_states[col_idx].clone()
410                            } else {
411                                // If interpolation step is None, fill with NULL
412                                None
413                            }
414                        }
415                    }
416                } else {
417                    // No strategy specified, default to NULL. This can include upstream stream-key
418                    // columns (for example hidden `_row_id` on no-PK inputs). The generated row is
419                    // still distinct from both anchor rows because the time column is always part
420                    // of the gap-fill output key and `fill_time` is strictly between them.
421                    None
422                };
423                new_row_data.push(datum);
424            }
425
426            let filled_row = OwnedRow::new(new_row_data);
427            debug_assert_ne!(
428                filled_row.datum_at(time_column_index),
429                prev_row.datum_at(time_column_index)
430            );
431            debug_assert_ne!(
432                filled_row.datum_at(time_column_index),
433                curr_row.datum_at(time_column_index)
434            );
435            filled_rows.push(filled_row);
436
437            fill_time = match fill_time.checked_add(*interval) {
438                Some(t) => t,
439                None => {
440                    // Time overflow during iteration, stop filling
441                    warn!(
442                        "Gap fill stopped due to timestamp overflow after generating {} rows.",
443                        filled_rows.len()
444                    );
445                    break;
446                }
447            };
448        }
449
450        // Update metrics with the number of generated rows
451        generation_context
452            .metrics
453            .gap_fill_generated_rows_count
454            .inc_by(filled_rows.len() as u64);
455
456        if filled_rows.len() > generation_context.high_amplification_threshold {
457            let partition_key = prev_row.project(partition_by_indices);
458            tracing::warn!(target: "high_gap_fill_amplification",
459                generated_rows_len = filled_rows.len(),
460                prev_time = ?prev_time,
461                curr_time = ?curr_time,
462                gap_interval = ?interval,
463                partition_key = ?partition_key,
464                actor_id = %generation_context.actor_ctx.id,
465                fragment_id = %generation_context.actor_ctx.fragment_id,
466                "large rows generated by gap fill"
467            );
468        }
469
470        Ok(filled_rows)
471    }
472
473    /// Emit the minimal changelog turning `old_fills` into `new_fills` (both sorted by time).
474    ///
475    /// With `reuse_unchanged` false (e.g. interpolation changes every fill) all `old_fills` are
476    /// replaced by `new_fills`; otherwise the lists are merged and only differing rows are emitted,
477    /// leaving an unchanged LOCF prefix or NULL fill in place.
478    fn diff_fills(
479        old_fills: Vec<OwnedRow>,
480        new_fills: Vec<OwnedRow>,
481        time_column_index: usize,
482        reuse_unchanged: bool,
483    ) -> Vec<(Op, OwnedRow)> {
484        if !reuse_unchanged {
485            return old_fills
486                .into_iter()
487                .map(|row| (Op::Delete, row))
488                .chain(new_fills.into_iter().map(|row| (Op::Insert, row)))
489                .collect();
490        }
491
492        let mut ops = Vec::new();
493        let (mut i, mut j) = (0, 0);
494        while i < old_fills.len() && j < new_fills.len() {
495            match old_fills[i]
496                .datum_at(time_column_index)
497                .default_cmp(&new_fills[j].datum_at(time_column_index))
498            {
499                Ordering::Less => {
500                    ops.push((Op::Delete, old_fills[i].clone()));
501                    i += 1;
502                }
503                Ordering::Greater => {
504                    ops.push((Op::Insert, new_fills[j].clone()));
505                    j += 1;
506                }
507                Ordering::Equal => {
508                    // Same timestamp: rewrite only if the value changed; keep identical fills.
509                    if old_fills[i] != new_fills[j] {
510                        ops.push((Op::Delete, old_fills[i].clone()));
511                        ops.push((Op::Insert, new_fills[j].clone()));
512                    }
513                    i += 1;
514                    j += 1;
515                }
516            }
517        }
518        for old_row in &old_fills[i..] {
519            ops.push((Op::Delete, old_row.clone()));
520        }
521        for new_row in &new_fills[j..] {
522            ops.push((Op::Insert, new_row.clone()));
523        }
524        ops
525    }
526}
527
528impl<S: StateStore> Execute for GapFillExecutor<S> {
529    fn execute(self: Box<Self>) -> BoxedMessageStream {
530        self.execute_inner().boxed()
531    }
532}
533
534impl<S: StateStore> GapFillExecutor<S> {
535    #[try_stream(ok = Message, error = StreamExecutorError)]
536    async fn execute_inner(self: Box<Self>) {
537        let Self {
538            mut managed_state,
539            schema,
540            chunk_size,
541            time_column_index,
542            fill_columns,
543            gap_interval,
544            high_gap_fill_amplification_threshold,
545            ctx,
546            input,
547            metrics,
548        } = *self;
549
550        let mut input = input.execute();
551
552        let barrier = expect_first_barrier(&mut input).await?;
553        let first_epoch = barrier.epoch;
554        yield Message::Barrier(barrier);
555        managed_state.init_epoch(first_epoch).await?;
556
557        // Calculate and validate gap interval once at initialization
558        let dummy_row = OwnedRow::new(vec![]);
559        let interval_datum = gap_interval.eval_row_infallible(&dummy_row).await;
560        let interval = interval_datum
561            .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
562            .into_interval();
563
564        if interval <= Interval::from_month_day_usec(0, 0, 0) {
565            Err(anyhow::anyhow!("Gap interval must be positive"))?;
566        }
567        let generation_context = GapFillGenerationContext {
568            metrics: &metrics,
569            high_amplification_threshold: high_gap_fill_amplification_threshold,
570            actor_ctx: &ctx,
571        };
572
573        let partition_by_indices = managed_state.partition_by_indices.clone();
574        let pointer_key_indices = managed_state.pointer_key_indices.clone();
575        // Interpolation re-slopes every fill, so a changed anchor changes all of them; only
576        // LOCF/NULL fills can be reused by the diff.
577        let has_interpolate = fill_columns
578            .values()
579            .any(|strategy| matches!(strategy, FillStrategy::Interpolate));
580
581        #[for_await]
582        for msg in input {
583            match msg? {
584                Message::Chunk(chunk) => {
585                    let chunk = chunk.compact_vis();
586                    let mut chunk_builder =
587                        StreamChunkBuilder::new(chunk_size, schema.data_types());
588
589                    // Fill rows interleave between an update's U-/U+, so the pair can't stay
590                    // adjacent; normalize anchor ops to Insert/Delete to avoid a dangling Update.
591                    for (op, row_ref) in chunk.rows() {
592                        let row = row_ref.to_owned_row();
593                        if row.datum_at(time_column_index).is_none() {
594                            if let Some(chunk) =
595                                chunk_builder.append_row(op.normalize_update(), &row)
596                            {
597                                yield Message::Chunk(chunk);
598                            }
599                            continue;
600                        }
601                        let partition_key = (&row).project(&partition_by_indices);
602                        let pointer_key = (&row).project(&pointer_key_indices);
603
604                        match op {
605                            Op::Insert | Op::UpdateInsert => {
606                                let prev_output = Self::find_prev_output(
607                                    &managed_state,
608                                    &partition_key,
609                                    &pointer_key,
610                                )
611                                .await?;
612
613                                let next_output = Self::find_next_output(
614                                    &managed_state,
615                                    &partition_key,
616                                    &pointer_key,
617                                )
618                                .await?;
619
620                                // Splitting the gap leaves the `prev -> row` prefix unchanged for
621                                // LOCF/NULL, so `split_time` makes both sides skip it; `None`
622                                // (interpolation, or no gap to split) rebuilds the whole gap.
623                                let split_time = (!has_interpolate
624                                    && prev_output.is_some()
625                                    && next_output.is_some())
626                                .then(|| {
627                                    row.datum_at(time_column_index)
628                                        .and_then(time_scalar_to_timestamp)
629                                })
630                                .flatten();
631
632                                let old_fills = if let (Some(prev_out), Some(next_out)) =
633                                    (&prev_output, &next_output)
634                                {
635                                    Self::generate_filled_rows_between_static(
636                                        prev_out,
637                                        next_out,
638                                        &interval,
639                                        time_column_index,
640                                        &managed_state.partition_by_indices,
641                                        &fill_columns,
642                                        &generation_context,
643                                        split_time,
644                                    )?
645                                } else {
646                                    vec![]
647                                };
648                                let mut new_fills = vec![];
649                                if split_time.is_none()
650                                    && let Some(prev_out) = &prev_output
651                                {
652                                    new_fills.extend(Self::generate_filled_rows_between_static(
653                                        prev_out,
654                                        &row,
655                                        &interval,
656                                        time_column_index,
657                                        &managed_state.partition_by_indices,
658                                        &fill_columns,
659                                        &generation_context,
660                                        None,
661                                    )?);
662                                }
663                                if let Some(next_out) = &next_output {
664                                    new_fills.extend(Self::generate_filled_rows_between_static(
665                                        &row,
666                                        next_out,
667                                        &interval,
668                                        time_column_index,
669                                        &managed_state.partition_by_indices,
670                                        &fill_columns,
671                                        &generation_context,
672                                        None,
673                                    )?);
674                                }
675
676                                // A late anchor on a filled slot shares that fill's downstream key,
677                                // so retract the changed fills before inserting the anchor.
678                                for (fill_op, filled_row) in Self::diff_fills(
679                                    old_fills,
680                                    new_fills,
681                                    time_column_index,
682                                    !has_interpolate,
683                                ) {
684                                    if let Some(chunk) =
685                                        chunk_builder.append_row(fill_op, &filled_row)
686                                    {
687                                        yield Message::Chunk(chunk);
688                                    }
689                                }
690
691                                managed_state.insert(&row);
692                                if let Some(chunk) =
693                                    chunk_builder.append_row(op.normalize_update(), &row)
694                                {
695                                    yield Message::Chunk(chunk);
696                                }
697                            }
698                            Op::Delete | Op::UpdateDelete => {
699                                let prev_output = Self::find_prev_output(
700                                    &managed_state,
701                                    &partition_key,
702                                    &pointer_key,
703                                )
704                                .await?;
705
706                                let next_output = Self::find_next_output(
707                                    &managed_state,
708                                    &partition_key,
709                                    &pointer_key,
710                                )
711                                .await?;
712
713                                // Merging the gap leaves the `prev -> row` prefix unchanged for
714                                // LOCF/NULL, so `split_time` makes both sides skip it; `None`
715                                // (interpolation, or no merged gap) rebuilds the whole gap.
716                                let split_time = (!has_interpolate
717                                    && prev_output.is_some()
718                                    && next_output.is_some())
719                                .then(|| {
720                                    row.datum_at(time_column_index)
721                                        .and_then(time_scalar_to_timestamp)
722                                })
723                                .flatten();
724
725                                let mut old_fills = vec![];
726                                if split_time.is_none()
727                                    && let Some(prev_out) = &prev_output
728                                {
729                                    old_fills.extend(Self::generate_filled_rows_between_static(
730                                        prev_out,
731                                        &row,
732                                        &interval,
733                                        time_column_index,
734                                        &managed_state.partition_by_indices,
735                                        &fill_columns,
736                                        &generation_context,
737                                        None,
738                                    )?);
739                                }
740                                if let Some(next_out) = &next_output {
741                                    old_fills.extend(Self::generate_filled_rows_between_static(
742                                        &row,
743                                        next_out,
744                                        &interval,
745                                        time_column_index,
746                                        &managed_state.partition_by_indices,
747                                        &fill_columns,
748                                        &generation_context,
749                                        None,
750                                    )?);
751                                }
752                                let new_fills = if let (Some(prev_out), Some(next_out)) =
753                                    (&prev_output, &next_output)
754                                {
755                                    Self::generate_filled_rows_between_static(
756                                        prev_out,
757                                        next_out,
758                                        &interval,
759                                        time_column_index,
760                                        &managed_state.partition_by_indices,
761                                        &fill_columns,
762                                        &generation_context,
763                                        split_time,
764                                    )?
765                                } else {
766                                    vec![]
767                                };
768
769                                managed_state.delete(&row);
770                                if let Some(chunk) =
771                                    chunk_builder.append_row(op.normalize_update(), &row)
772                                {
773                                    yield Message::Chunk(chunk);
774                                }
775
776                                for (fill_op, filled_row) in Self::diff_fills(
777                                    old_fills,
778                                    new_fills,
779                                    time_column_index,
780                                    !has_interpolate,
781                                ) {
782                                    if let Some(chunk) =
783                                        chunk_builder.append_row(fill_op, &filled_row)
784                                    {
785                                        yield Message::Chunk(chunk);
786                                    }
787                                }
788                            }
789                        }
790                    }
791
792                    if let Some(chunk) = chunk_builder.take() {
793                        yield Message::Chunk(chunk);
794                    }
795
796                    managed_state.try_flush().await?;
797                }
798                Message::Watermark(_) => {
799                    // Gap fill back-fills and retracts rows below the latest time, so its output is
800                    // not watermark-aligned on any column (see the empty `WatermarkColumns` in
801                    // `StreamGapFill`). Drop input watermarks rather than forwarding a promise the
802                    // output cannot keep.
803                    continue;
804                }
805                Message::Barrier(barrier) => {
806                    let post_commit = managed_state.flush(barrier.epoch).await?;
807                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(ctx.id);
808                    yield Message::Barrier(barrier);
809                    let _ = post_commit.post_yield_barrier(update_vnode_bitmap).await?;
810                }
811            }
812        }
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use itertools::Itertools;
819    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
820    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
821    use risingwave_common::types::test_utils::IntervalTestExt;
822    use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamp};
823    use risingwave_common::util::epoch::test_epoch;
824    use risingwave_common::util::sort_util::OrderType;
825    use risingwave_expr::expr::LiteralExpression;
826    use risingwave_storage::memory::MemoryStateStore;
827
828    use super::*;
829    use crate::common::table::state_table::StateTable;
830    use crate::common::table::test_utils::gen_pbtable_with_dist_key;
831    use crate::executor::test_utils::{MessageSender, MockSource};
832
833    async fn create_executor(
834        store: MemoryStateStore,
835        fill_columns: HashMap<usize, FillStrategy>,
836        schema: Schema,
837        gap_interval: Interval,
838    ) -> (MessageSender, BoxedMessageStream) {
839        let (tx, source) = MockSource::channel();
840        let source = source.into_executor(schema.clone(), vec![0]);
841
842        let time_column_index = 0;
843        let partition_by_indices: Vec<usize> = vec![];
844        // Stream key = [0] (time column), so the lookup key within the singleton partition is the
845        // time value.
846        let pointer_key_indices = vec![0];
847
848        let table_columns: Vec<ColumnDesc> = schema
849            .fields
850            .iter()
851            .enumerate()
852            .map(|(i, f)| ColumnDesc::unnamed(ColumnId::new(i as i32), f.data_type.clone()))
853            .collect();
854
855        // PK: (partition_cols, time_col, stream_key) with dedup = [0]
856        // (no partition, time=0, sk=[0] already covered)
857        let table = StateTable::from_table_catalog(
858            &gen_pbtable_with_dist_key(
859                TableId::new(0),
860                table_columns,
861                vec![OrderType::ascending()],
862                vec![0],
863                0,
864                vec![],
865            ),
866            store,
867            None,
868        )
869        .await;
870
871        let executor = GapFillExecutor::new(GapFillExecutorArgs {
872            ctx: ActorContext::for_test(123),
873            input: source,
874            schema: schema.clone(),
875            chunk_size: 1024,
876            time_column_index,
877            fill_columns,
878            gap_interval: NonStrictExpression::for_test(LiteralExpression::new(
879                DataType::Interval,
880                Some(gap_interval.into()),
881            )),
882            state_table: table,
883            partition_by_indices,
884            pointer_key_indices,
885            high_gap_fill_amplification_threshold: 2048,
886        });
887
888        (tx, executor.boxed().execute())
889    }
890
891    fn test_gap_fill_metrics() -> GapFillMetrics {
892        let ctx = ActorContext::for_test(123);
893        let actor_id = ctx.id.to_string();
894        let fragment_id = ctx.fragment_id.to_string();
895
896        GapFillMetrics {
897            gap_fill_generated_rows_count: ctx
898                .streaming_metrics
899                .gap_fill_generated_rows_count
900                .with_guarded_label_values(&[&actor_id, &fragment_id]),
901        }
902    }
903
904    #[test]
905    fn test_generate_filled_rows_between_static() {
906        // Row layout `[partition, time, locf, no-strategy]`, gap 10:00 -> 10:05 on a 1-minute grid.
907        let anchor = |minute: &str, locf: i32| {
908            OwnedRow::new(vec![
909                Some(ScalarImpl::Int32(7)),
910                Some(ScalarImpl::Timestamp(minute.parse().unwrap())),
911                Some(ScalarImpl::Int32(locf)),
912                Some(ScalarImpl::Int32(99)),
913            ])
914        };
915        let prev_row = anchor("2023-04-01T10:00:00", 10);
916        let curr_row = anchor("2023-04-01T10:05:00", 40);
917
918        let ctx = ActorContext::for_test(123);
919        let metrics = test_gap_fill_metrics();
920        let generation_context = GapFillGenerationContext {
921            metrics: &metrics,
922            high_amplification_threshold: 2048,
923            actor_ctx: &ctx,
924        };
925        let generate = |build_from: Option<Timestamp>| {
926            GapFillExecutor::<MemoryStateStore>::generate_filled_rows_between_static(
927                &prev_row,
928                &curr_row,
929                &Interval::from_minutes(1),
930                1,
931                &[0],
932                &HashMap::from([(2, FillStrategy::Locf)]),
933                &generation_context,
934                build_from,
935            )
936            .unwrap()
937        };
938
939        // Each fill keeps the partition column (7) and the LOCF value (10); the no-strategy column
940        // defaults to NULL.
941        let fill = |minute: &str| {
942            OwnedRow::new(vec![
943                Some(ScalarImpl::Int32(7)),
944                Some(ScalarImpl::Timestamp(minute.parse().unwrap())),
945                Some(ScalarImpl::Int32(10)),
946                None,
947            ])
948        };
949        let full = vec![
950            fill("2023-04-01T10:01:00"),
951            fill("2023-04-01T10:02:00"),
952            fill("2023-04-01T10:03:00"),
953            fill("2023-04-01T10:04:00"),
954        ];
955        assert_eq!(generate(None), full);
956
957        // `build_from` skips the prefix below it, returning exactly the suffix.
958        assert_eq!(
959            generate(Some("2023-04-01T10:03:00".parse().unwrap())),
960            full[2..]
961        );
962    }
963
964    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
965    async fn test_streaming_gap_fill_locf() {
966        let store = MemoryStateStore::new();
967        let schema = Schema::new(vec![
968            Field::unnamed(DataType::Timestamp),
969            Field::unnamed(DataType::Int32),
970            Field::unnamed(DataType::Float64),
971        ]);
972        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
973        let (mut tx, mut executor) =
974            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
975
976        // Init with barrier.
977        tx.push_barrier(test_epoch(1), false);
978        executor.next().await.unwrap().unwrap(); // Barrier
979
980        // 1. Send an initial chunk with a gap to test basic filling.
981        tx.push_chunk(StreamChunk::from_pretty(
982            " TS                  i   F
983            + 2022-01-01T00:00:00 1   1.0
984            + 2022-01-01T00:03:00 4   4.0",
985        ));
986
987        let chunk = next_chunk(&mut executor).await;
988        let expected = StreamChunk::from_pretty(
989            " TS                  i   F
990            + 2022-01-01T00:00:00 1   1.0
991            + 2022-01-01T00:01:00 1   1.0
992            + 2022-01-01T00:02:00 1   1.0
993            + 2022-01-01T00:03:00 4   4.0",
994        );
995
996        // Simple comparison since the test utility assumes Int64 keys.
997        assert_eq!(chunk.ops(), expected.ops());
998        assert_eq!(chunk.visibility(), expected.visibility());
999
1000        // Compare each row individually.
1001        let chunk_rows: Vec<_> = chunk.rows().collect();
1002        let expected_rows: Vec<_> = expected.rows().collect();
1003        assert_eq!(chunk_rows.len(), expected_rows.len());
1004
1005        for (i, ((op1, row1), (op2, row2))) in
1006            chunk_rows.iter().zip_eq(expected_rows.iter()).enumerate()
1007        {
1008            assert_eq!(op1, op2, "Row {} operation mismatch", i);
1009            assert_eq!(
1010                row1.to_owned_row(),
1011                row2.to_owned_row(),
1012                "Row {} data mismatch",
1013                i
1014            );
1015        }
1016
1017        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1018        // This tests if the executor can correctly retract old filled rows and create new ones.
1019        tx.push_chunk(StreamChunk::from_pretty(
1020            " TS                  i   F
1021            + 2022-01-01T00:02:00 2   2.0",
1022        ));
1023
1024        // 00:01's fill is unchanged (LOCF from 00:00 either way); only the 00:02 slot is rewritten.
1025        let chunk2 = next_chunk(&mut executor).await;
1026
1027        let expected2 = StreamChunk::from_pretty(
1028            " TS                  i   F
1029                - 2022-01-01T00:02:00 1   1.0
1030                + 2022-01-01T00:02:00 2   2.0",
1031        );
1032
1033        assert_eq!(chunk2.sort_rows(), expected2.sort_rows());
1034
1035        // 3. Send a delete chunk to remove an original data point.
1036        // This should trigger retraction of old fills and generation of new ones.
1037        tx.push_chunk(StreamChunk::from_pretty(
1038            " TS                  i   F
1039            - 2022-01-01T00:02:00 2   2.0",
1040        ));
1041
1042        let chunk3 = next_chunk(&mut executor).await;
1043        assert_eq!(
1044            chunk3.sort_rows(),
1045            StreamChunk::from_pretty(
1046                " TS                  i   F
1047                - 2022-01-01T00:02:00 2   2.0
1048                + 2022-01-01T00:02:00 1   1.0"
1049            )
1050            .sort_rows()
1051        );
1052
1053        // 4. Send an update chunk to modify an original data point.
1054        // This should also trigger retraction and re-generation of fills.
1055        tx.push_chunk(StreamChunk::from_pretty(
1056            " TS                  i   F
1057            U- 2022-01-01T00:03:00 4   4.0
1058            U+ 2022-01-01T00:03:00 5   5.0",
1059        ));
1060
1061        let chunk4 = next_chunk(&mut executor).await;
1062        // The filled rows' values don't change as they depend on the first row,
1063        // but they are still retracted and re-inserted due to the general path logic.
1064        assert_eq!(
1065            chunk4.sort_rows(),
1066            StreamChunk::from_pretty(
1067                " TS                  i   F
1068                - 2022-01-01T00:01:00 1   1.0
1069                - 2022-01-01T00:02:00 1   1.0
1070                - 2022-01-01T00:03:00 4   4.0
1071                + 2022-01-01T00:01:00 1   1.0
1072                + 2022-01-01T00:02:00 1   1.0
1073                + 2022-01-01T00:03:00 5   5.0"
1074            )
1075            .sort_rows()
1076        );
1077    }
1078
1079    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1080    async fn test_streaming_gap_fill_null() {
1081        let store = MemoryStateStore::new();
1082        let schema = Schema::new(vec![
1083            Field::unnamed(DataType::Timestamp),
1084            Field::unnamed(DataType::Int32),
1085            Field::unnamed(DataType::Float64),
1086        ]);
1087        let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1088        let (mut tx, mut executor) =
1089            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1090
1091        // Init with barrier.
1092        tx.push_barrier(test_epoch(1), false);
1093        executor.next().await.unwrap().unwrap(); // Barrier
1094
1095        // 1. Send an initial chunk with a gap to test basic filling.
1096        tx.push_chunk(StreamChunk::from_pretty(
1097            " TS                  i   F
1098            + 2022-01-01T00:00:00 1   1.0
1099            + 2022-01-01T00:03:00 4   4.0",
1100        ));
1101
1102        let chunk = next_chunk(&mut executor).await;
1103        assert_eq!(
1104            chunk.sort_rows(),
1105            StreamChunk::from_pretty(
1106                " TS                  i   F
1107                + 2022-01-01T00:00:00 1   1.0
1108                + 2022-01-01T00:01:00 .   .
1109                + 2022-01-01T00:02:00 .   .
1110                + 2022-01-01T00:03:00 4   4.0"
1111            )
1112            .sort_rows()
1113        );
1114
1115        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1116        tx.push_chunk(StreamChunk::from_pretty(
1117            " TS                  i   F
1118            + 2022-01-01T00:02:00 2   2.0",
1119        ));
1120
1121        let chunk2 = next_chunk(&mut executor).await;
1122        assert_eq!(
1123            chunk2.sort_rows(),
1124            StreamChunk::from_pretty(
1125                " TS                  i   F
1126                - 2022-01-01T00:02:00 .   .
1127                + 2022-01-01T00:02:00 2   2.0"
1128            )
1129            .sort_rows()
1130        );
1131
1132        // 3. Send a delete chunk to remove an original data point.
1133        tx.push_chunk(StreamChunk::from_pretty(
1134            " TS                  i   F
1135            - 2022-01-01T00:02:00 2   2.0",
1136        ));
1137
1138        let chunk3 = next_chunk(&mut executor).await;
1139        assert_eq!(
1140            chunk3.sort_rows(),
1141            StreamChunk::from_pretty(
1142                " TS                  i   F
1143                - 2022-01-01T00:02:00 2   2.0
1144                + 2022-01-01T00:02:00 .   ."
1145            )
1146            .sort_rows()
1147        );
1148
1149        // 4. Send an update chunk to modify an original data point.
1150        tx.push_chunk(StreamChunk::from_pretty(
1151            " TS                  i   F
1152            U- 2022-01-01T00:03:00 4   4.0
1153            U+ 2022-01-01T00:03:00 5   5.0",
1154        ));
1155
1156        let chunk4 = next_chunk(&mut executor).await;
1157        assert_eq!(
1158            chunk4.sort_rows(),
1159            StreamChunk::from_pretty(
1160                " TS                  i   F
1161                - 2022-01-01T00:01:00 .   .
1162                - 2022-01-01T00:02:00 .   .
1163                - 2022-01-01T00:03:00 4   4.0
1164                + 2022-01-01T00:01:00 .   .
1165                + 2022-01-01T00:02:00 .   .
1166                + 2022-01-01T00:03:00 5   5.0"
1167            )
1168            .sort_rows()
1169        );
1170    }
1171
1172    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1173    async fn test_streaming_gap_fill_interpolate() {
1174        let store = MemoryStateStore::new();
1175        let schema = Schema::new(vec![
1176            Field::unnamed(DataType::Timestamp),
1177            Field::unnamed(DataType::Int32),
1178            Field::unnamed(DataType::Float64),
1179        ]);
1180        let fill_columns = HashMap::from([
1181            (1, FillStrategy::Interpolate),
1182            (2, FillStrategy::Interpolate),
1183        ]);
1184        let (mut tx, mut executor) =
1185            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1186
1187        // Init with barrier.
1188        tx.push_barrier(test_epoch(1), false);
1189        executor.next().await.unwrap().unwrap(); // Barrier
1190
1191        // 1. Send an initial chunk with a gap to test basic filling.
1192        tx.push_chunk(StreamChunk::from_pretty(
1193            " TS                  i   F
1194            + 2022-01-01T00:00:00 1   1.0
1195            + 2022-01-01T00:03:00 4   4.0",
1196        ));
1197
1198        let chunk = next_chunk(&mut executor).await;
1199        assert_eq!(
1200            chunk.sort_rows(),
1201            StreamChunk::from_pretty(
1202                " TS                  i   F
1203                + 2022-01-01T00:00:00 1   1.0
1204                + 2022-01-01T00:01:00 2   2.0
1205                + 2022-01-01T00:02:00 3   3.0
1206                + 2022-01-01T00:03:00 4   4.0"
1207            )
1208            .sort_rows()
1209        );
1210
1211        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1212        tx.push_chunk(StreamChunk::from_pretty(
1213            " TS                  i   F
1214            + 2022-01-01T00:02:00 10  10.0",
1215        ));
1216
1217        let chunk2 = next_chunk(&mut executor).await;
1218        assert_eq!(
1219            chunk2.sort_rows(),
1220            StreamChunk::from_pretty(
1221                " TS                  i   F
1222                - 2022-01-01T00:01:00 2   2.0
1223                - 2022-01-01T00:02:00 3   3.0
1224                + 2022-01-01T00:01:00 5   5.5
1225                + 2022-01-01T00:02:00 10  10.0"
1226            )
1227            .sort_rows()
1228        );
1229
1230        // 3. Send a delete chunk to remove an original data point.
1231        // This should trigger retraction of old fills and re-calculation of interpolated values.
1232        tx.push_chunk(StreamChunk::from_pretty(
1233            " TS                  i   F
1234            - 2022-01-01T00:02:00 10  10.0",
1235        ));
1236
1237        let chunk3 = next_chunk(&mut executor).await;
1238        assert_eq!(
1239            chunk3.sort_rows(),
1240            StreamChunk::from_pretty(
1241                " TS                  i   F
1242                - 2022-01-01T00:01:00 5   5.5
1243                - 2022-01-01T00:02:00 10  10.0
1244                + 2022-01-01T00:01:00 2   2.0
1245                + 2022-01-01T00:02:00 3   3.0"
1246            )
1247            .sort_rows()
1248        );
1249
1250        // 4. Send an update chunk to modify an original data point.
1251        // This will cause the interpolated values to be re-calculated.
1252        tx.push_chunk(StreamChunk::from_pretty(
1253            " TS                  i   F
1254            U- 2022-01-01T00:03:00 4   4.0
1255            U+ 2022-01-01T00:03:00 10  10.0",
1256        ));
1257
1258        let chunk4 = next_chunk(&mut executor).await;
1259        assert_eq!(
1260            chunk4.sort_rows(),
1261            StreamChunk::from_pretty(
1262                " TS                  i   F
1263                - 2022-01-01T00:01:00 2   2.0
1264                - 2022-01-01T00:02:00 3   3.0
1265                - 2022-01-01T00:03:00 4   4.0
1266                + 2022-01-01T00:01:00 4   4.0
1267                + 2022-01-01T00:02:00 7   7.0
1268                + 2022-01-01T00:03:00 10  10.0"
1269            )
1270            .sort_rows()
1271        );
1272    }
1273
1274    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1275    async fn test_streaming_gap_fill_prefers_nearest_state_neighbor_over_partial_cache() {
1276        let store = MemoryStateStore::new();
1277        let schema = Schema::new(vec![
1278            Field::unnamed(DataType::Timestamp),
1279            Field::unnamed(DataType::Int32),
1280            Field::unnamed(DataType::Float64),
1281        ]);
1282        let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1283
1284        // --- First run: persist the initial anchors into state.
1285        let (mut tx, mut executor) = create_executor(
1286            store.clone(),
1287            fill_columns.clone(),
1288            schema.clone(),
1289            Interval::from_minutes(1),
1290        )
1291        .await;
1292
1293        tx.push_barrier(test_epoch(1), false);
1294        executor.next().await.unwrap().unwrap(); // Barrier
1295
1296        tx.push_chunk(StreamChunk::from_pretty(
1297            " TS                  i   F
1298            + 2022-01-01T00:00:00 10  10.0
1299            + 2022-01-01T00:02:00 12  12.0
1300            + 2022-01-01T00:05:00 15  15.0",
1301        ));
1302        executor.next().await.unwrap().unwrap(); // Initial filled chunk
1303
1304        tx.push_barrier(test_epoch(2), false);
1305        executor.next().await.unwrap().unwrap(); // Commit
1306
1307        // --- Second run: recovered state exists, but cache starts empty.
1308        let (mut tx2, mut executor2) = create_executor(
1309            store.clone(),
1310            fill_columns.clone(),
1311            schema.clone(),
1312            Interval::from_minutes(1),
1313        )
1314        .await;
1315
1316        tx2.push_barrier(test_epoch(2), false);
1317        executor2.next().await.unwrap().unwrap(); // Recovery barrier
1318
1319        // Insert a later anchor so the cache only knows about 00:08.
1320        tx2.push_chunk(StreamChunk::from_pretty(
1321            " TS                  i   F
1322            + 2022-01-01T00:08:00 20  20.0",
1323        ));
1324        executor2.next().await.unwrap().unwrap(); // Chunk
1325
1326        tx2.push_barrier(test_epoch(3), false);
1327        executor2.next().await.unwrap().unwrap(); // Commit
1328
1329        // Insert into the historical gap. The nearest next neighbor is 00:02 from state, not
1330        // the cached 00:08 row. We should only retract the old fill at 00:01.
1331        tx2.push_chunk(StreamChunk::from_pretty(
1332            " TS                  i   F
1333            + 2022-01-01T00:01:00 11  11.0",
1334        ));
1335
1336        let chunk = next_chunk(&mut executor2).await;
1337
1338        assert_eq!(
1339            chunk.sort_rows(),
1340            StreamChunk::from_pretty(
1341                " TS                  i   F
1342                - 2022-01-01T00:01:00 .   .
1343                + 2022-01-01T00:01:00 11  11.0"
1344            )
1345            .sort_rows()
1346        );
1347    }
1348
1349    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1350    async fn test_streaming_gap_fill_recovery() {
1351        let store = MemoryStateStore::new();
1352        let schema = Schema::new(vec![
1353            Field::unnamed(DataType::Timestamp),
1354            Field::unnamed(DataType::Int32),
1355            Field::unnamed(DataType::Float64),
1356        ]);
1357        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Interpolate)]);
1358
1359        // --- First run ---
1360        let (mut tx, mut executor) = create_executor(
1361            store.clone(),
1362            fill_columns.clone(),
1363            schema.clone(),
1364            Interval::from_minutes(1),
1365        )
1366        .await;
1367
1368        // Init with barrier.
1369        tx.push_barrier(test_epoch(1), false);
1370        executor.next().await.unwrap().unwrap(); // Barrier
1371
1372        // Send a chunk and commit.
1373        tx.push_chunk(StreamChunk::from_pretty(
1374            " TS                  i   F
1375            + 2022-01-01T00:00:00 1   1.0
1376            + 2022-01-01T00:03:00 4   4.0",
1377        ));
1378
1379        // Consume the initial filled chunk.
1380        let chunk = next_chunk(&mut executor).await;
1381        assert_eq!(
1382            chunk.sort_rows(),
1383            StreamChunk::from_pretty(
1384                " TS                  i   F
1385                + 2022-01-01T00:00:00 1   1.0
1386                + 2022-01-01T00:01:00 1   2.0
1387                + 2022-01-01T00:02:00 1   3.0
1388                + 2022-01-01T00:03:00 4   4.0"
1389            )
1390            .sort_rows()
1391        );
1392
1393        tx.push_barrier(test_epoch(2), false);
1394        executor.next().await.unwrap().unwrap(); // Barrier to commit.
1395
1396        // --- Second run (after recovery) ---
1397        let (mut tx2, mut executor2) = create_executor(
1398            store.clone(),
1399            fill_columns.clone(),
1400            schema.clone(),
1401            Interval::from_minutes(1),
1402        )
1403        .await;
1404
1405        // Init with barrier, which triggers recovery.
1406        tx2.push_barrier(test_epoch(2), false);
1407        executor2.next().await.unwrap().unwrap(); // Barrier
1408
1409        // After recovery, the executor should not output anything for the loaded state.
1410
1411        // Send a new chunk, which should fill the gap between old and new data.
1412        tx2.push_chunk(StreamChunk::from_pretty(
1413            " TS                  i   F
1414            + 2022-01-01T00:05:00 6   10.0",
1415        ));
1416
1417        let chunk2 = next_chunk(&mut executor2).await;
1418        assert_eq!(
1419            chunk2.sort_rows(),
1420            StreamChunk::from_pretty(
1421                " TS                  i   F
1422                + 2022-01-01T00:04:00 4   7.0
1423                + 2022-01-01T00:05:00 6   10.0"
1424            )
1425            .sort_rows()
1426        );
1427    }
1428
1429    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1430    async fn test_streaming_gap_fill_mixed_strategy() {
1431        let store = MemoryStateStore::new();
1432        let schema = Schema::new(vec![
1433            Field::unnamed(DataType::Timestamp),
1434            Field::unnamed(DataType::Int32),
1435            Field::unnamed(DataType::Int64),
1436            Field::unnamed(DataType::Float32),
1437            Field::unnamed(DataType::Float64),
1438        ]);
1439
1440        let fill_columns = HashMap::from([
1441            (1, FillStrategy::Interpolate),
1442            (2, FillStrategy::Locf),
1443            (3, FillStrategy::Null),
1444            (4, FillStrategy::Interpolate),
1445        ]);
1446        let gap_interval = Interval::from_days(1);
1447        let (mut tx, mut executor) =
1448            create_executor(store, fill_columns, schema, gap_interval).await;
1449
1450        // Init with barrier.
1451        tx.push_barrier(test_epoch(1), false);
1452        executor.next().await.unwrap().unwrap();
1453
1454        // Send an initial chunk with a gap to test mixed filling strategies.
1455        tx.push_chunk(StreamChunk::from_pretty(
1456            " TS                  i   I    f     F
1457            + 2023-04-01T10:00:00 10 100 1.0 100.0
1458            + 2023-04-05T10:00:00 50 200 5.0 200.0",
1459        ));
1460
1461        let chunk = next_chunk(&mut executor).await;
1462        assert_eq!(
1463            chunk.sort_rows(),
1464            StreamChunk::from_pretty(
1465                " TS                  i   I    f     F
1466                + 2023-04-01T10:00:00 10 100 1.0 100.0
1467                + 2023-04-02T10:00:00 20 100 .    125.0
1468                + 2023-04-03T10:00:00 30 100 .    150.0
1469                + 2023-04-04T10:00:00 40 100 .    175.0
1470                + 2023-04-05T10:00:00 50 200 5.0 200.0"
1471            )
1472            .sort_rows()
1473        );
1474
1475        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1476        tx.push_chunk(StreamChunk::from_pretty(
1477            " TS                  i   I    f     F
1478            + 2023-04-03T10:00:00 25 150 3.0 160.0",
1479        ));
1480
1481        let chunk2 = next_chunk(&mut executor).await;
1482        assert_eq!(
1483            chunk2.sort_rows(),
1484            StreamChunk::from_pretty(
1485                " TS                  i   I    f     F
1486                - 2023-04-02T10:00:00 20 100 .    125.0
1487                - 2023-04-03T10:00:00 30 100 .    150.0
1488                - 2023-04-04T10:00:00 40 100 .    175.0
1489                + 2023-04-02T10:00:00 17 100 .    130.0
1490                + 2023-04-03T10:00:00 25 150 3.0 160.0
1491                + 2023-04-04T10:00:00 37 150 .    180.0"
1492            )
1493            .sort_rows()
1494        );
1495
1496        // 3. Send a delete chunk to remove an original data point.
1497        tx.push_chunk(StreamChunk::from_pretty(
1498            " TS                  i   I    f     F
1499            - 2023-04-03T10:00:00 25 150 3.0 160.0",
1500        ));
1501        let chunk3 = next_chunk(&mut executor).await;
1502        assert_eq!(
1503            chunk3.sort_rows(),
1504            StreamChunk::from_pretty(
1505                " TS                  i   I    f     F
1506                - 2023-04-02T10:00:00 17 100 .    130.0
1507                - 2023-04-03T10:00:00 25 150 3.0 160.0
1508                - 2023-04-04T10:00:00 37 150 .    180.0
1509                + 2023-04-02T10:00:00 20 100 .    125.0
1510                + 2023-04-03T10:00:00 30 100 .    150.0
1511                + 2023-04-04T10:00:00 40 100 .    175.0"
1512            )
1513            .sort_rows()
1514        );
1515
1516        // 4. Send an update chunk to modify an original data point.
1517        tx.push_chunk(StreamChunk::from_pretty(
1518            " TS                  i   I    f     F
1519            U- 2023-04-05T10:00:00 50 200 5.0 200.0
1520            U+ 2023-04-05T10:00:00 50 200 5.0 300.0",
1521        ));
1522        let chunk4 = next_chunk(&mut executor).await;
1523        assert_eq!(
1524            chunk4.sort_rows(),
1525            StreamChunk::from_pretty(
1526                " TS                  i   I    f     F
1527                - 2023-04-02T10:00:00 20 100 .    125.0
1528                - 2023-04-03T10:00:00 30 100 .    150.0
1529                - 2023-04-04T10:00:00 40 100 .    175.0
1530                - 2023-04-05T10:00:00 50 200 5.0 200.0
1531                + 2023-04-02T10:00:00 20 100 .    150.0
1532                + 2023-04-03T10:00:00 30 100 .    200.0
1533                + 2023-04-04T10:00:00 40 100 .    250.0
1534                + 2023-04-05T10:00:00 50 200 5.0 300.0"
1535            )
1536            .sort_rows()
1537        );
1538    }
1539
1540    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1541    async fn test_streaming_gap_fill_out_of_order_keeps_unchanged_prefix() {
1542        let store = MemoryStateStore::new();
1543        let schema = Schema::new(vec![
1544            Field::unnamed(DataType::Timestamp),
1545            Field::unnamed(DataType::Int32),
1546            Field::unnamed(DataType::Float64),
1547        ]);
1548        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1549        let (mut tx, mut executor) =
1550            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1551
1552        tx.push_barrier(test_epoch(1), false);
1553        executor.next().await.unwrap().unwrap(); // Barrier
1554
1555        // A wide gap: 00:01..=00:05 are all LOCF-filled from 00:00.
1556        tx.push_chunk(StreamChunk::from_pretty(
1557            " TS                  i   F
1558            + 2022-01-01T00:00:00 1   1.0
1559            + 2022-01-01T00:06:00 7   7.0",
1560        ));
1561        let chunk = next_chunk(&mut executor).await;
1562        assert_eq!(
1563            chunk.sort_rows(),
1564            StreamChunk::from_pretty(
1565                " TS                  i   F
1566                + 2022-01-01T00:00:00 1   1.0
1567                + 2022-01-01T00:01:00 1   1.0
1568                + 2022-01-01T00:02:00 1   1.0
1569                + 2022-01-01T00:03:00 1   1.0
1570                + 2022-01-01T00:04:00 1   1.0
1571                + 2022-01-01T00:05:00 1   1.0
1572                + 2022-01-01T00:06:00 7   7.0"
1573            )
1574            .sort_rows()
1575        );
1576
1577        // Anchor near the end: the LOCF fills before it (00:01..=00:04) are unchanged, so only the
1578        // 00:05 slot is rewritten regardless of how wide the prefix is.
1579        tx.push_chunk(StreamChunk::from_pretty(
1580            " TS                  i   F
1581            + 2022-01-01T00:05:00 5   5.0",
1582        ));
1583        let chunk2 = next_chunk(&mut executor).await;
1584        assert_eq!(
1585            chunk2.sort_rows(),
1586            StreamChunk::from_pretty(
1587                " TS                  i   F
1588                - 2022-01-01T00:05:00 1   1.0
1589                + 2022-01-01T00:05:00 5   5.0"
1590            )
1591            .sort_rows()
1592        );
1593    }
1594
1595    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1596    async fn test_streaming_gap_fill_null_out_of_order_keeps_unchanged_suffix() {
1597        let store = MemoryStateStore::new();
1598        let schema = Schema::new(vec![
1599            Field::unnamed(DataType::Timestamp),
1600            Field::unnamed(DataType::Int32),
1601            Field::unnamed(DataType::Float64),
1602        ]);
1603        let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1604        let (mut tx, mut executor) =
1605            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1606
1607        tx.push_barrier(test_epoch(1), false);
1608        executor.next().await.unwrap().unwrap(); // Barrier
1609
1610        // A wide gap with NULL fills at 00:01..=00:05.
1611        tx.push_chunk(StreamChunk::from_pretty(
1612            " TS                  i   F
1613            + 2022-01-01T00:00:00 1   1.0
1614            + 2022-01-01T00:06:00 7   7.0",
1615        ));
1616        let chunk = next_chunk(&mut executor).await;
1617        assert_eq!(
1618            chunk.sort_rows(),
1619            StreamChunk::from_pretty(
1620                " TS                  i   F
1621                + 2022-01-01T00:00:00 1   1.0
1622                + 2022-01-01T00:01:00 .   .
1623                + 2022-01-01T00:02:00 .   .
1624                + 2022-01-01T00:03:00 .   .
1625                + 2022-01-01T00:04:00 .   .
1626                + 2022-01-01T00:05:00 .   .
1627                + 2022-01-01T00:06:00 7   7.0"
1628            )
1629            .sort_rows()
1630        );
1631
1632        // Anchor in the middle: NULL fills are anchor-independent and the grid stays aligned, so
1633        // both sides (00:01,00:02 and 00:04,00:05) are unchanged — only 00:03 is rewritten.
1634        tx.push_chunk(StreamChunk::from_pretty(
1635            " TS                  i   F
1636            + 2022-01-01T00:03:00 3   3.0",
1637        ));
1638        let chunk2 = next_chunk(&mut executor).await;
1639        assert_eq!(
1640            chunk2.sort_rows(),
1641            StreamChunk::from_pretty(
1642                " TS                  i   F
1643                - 2022-01-01T00:03:00 .   .
1644                + 2022-01-01T00:03:00 3   3.0"
1645            )
1646            .sort_rows()
1647        );
1648    }
1649
1650    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1651    async fn test_streaming_gap_fill_out_of_order_retracts_before_reinsert() {
1652        let store = MemoryStateStore::new();
1653        let schema = Schema::new(vec![
1654            Field::unnamed(DataType::Timestamp),
1655            Field::unnamed(DataType::Int32),
1656            Field::unnamed(DataType::Float64),
1657        ]);
1658        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1659        let (mut tx, mut executor) =
1660            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1661
1662        tx.push_barrier(test_epoch(1), false);
1663        executor.next().await.unwrap().unwrap(); // Barrier
1664
1665        tx.push_chunk(StreamChunk::from_pretty(
1666            " TS                  i   F
1667            + 2022-01-01T00:00:00 1   1.0
1668            + 2022-01-01T00:04:00 4   4.0",
1669        ));
1670        executor.next().await.unwrap().unwrap(); // Initial fills 00:01..=00:03 (LOCF from 00:00).
1671
1672        // Insert collision: a late anchor on the filled 00:02 slot shares that fill's downstream
1673        // key, so the fill is retracted before the anchor is inserted; 00:03 also re-bases its LOCF
1674        // value from 00:00 to 00:02. Assert order, not just the set.
1675        tx.push_chunk(StreamChunk::from_pretty(
1676            " TS                  i   F
1677            + 2022-01-01T00:02:00 2   2.0",
1678        ));
1679        assert_chunk_eq_ordered(
1680            next_chunk(&mut executor).await,
1681            StreamChunk::from_pretty(
1682                " TS                  i   F
1683                - 2022-01-01T00:02:00 1   1.0
1684                - 2022-01-01T00:03:00 1   1.0
1685                + 2022-01-01T00:03:00 2   2.0
1686                + 2022-01-01T00:02:00 2   2.0",
1687            ),
1688        );
1689
1690        // Delete collision (symmetric): removing the 00:02 anchor turns its slot back into a fill,
1691        // which must be inserted after the anchor Delete.
1692        tx.push_chunk(StreamChunk::from_pretty(
1693            " TS                  i   F
1694            - 2022-01-01T00:02:00 2   2.0",
1695        ));
1696        assert_chunk_eq_ordered(
1697            next_chunk(&mut executor).await,
1698            StreamChunk::from_pretty(
1699                " TS                  i   F
1700                - 2022-01-01T00:02:00 2   2.0
1701                + 2022-01-01T00:02:00 1   1.0
1702                - 2022-01-01T00:03:00 2   2.0
1703                + 2022-01-01T00:03:00 1   1.0",
1704            ),
1705        );
1706    }
1707
1708    /// Assert `got` equals `expected` including op order (unlike `sort_rows`, which ignores order).
1709    fn assert_chunk_eq_ordered(got: StreamChunk, expected: StreamChunk) {
1710        assert_eq!(got.ops(), expected.ops());
1711        let got_rows: Vec<_> = got.rows().map(|(op, r)| (op, r.to_owned_row())).collect();
1712        let want_rows: Vec<_> = expected
1713            .rows()
1714            .map(|(op, r)| (op, r.to_owned_row()))
1715            .collect();
1716        assert_eq!(got_rows, want_rows);
1717    }
1718
1719    async fn next_chunk(executor: &mut BoxedMessageStream) -> StreamChunk {
1720        executor
1721            .next()
1722            .await
1723            .unwrap()
1724            .unwrap()
1725            .into_chunk()
1726            .unwrap()
1727    }
1728
1729    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1730    async fn test_streaming_gap_fill_off_grid_out_of_order_regrids_suffix() {
1731        let store = MemoryStateStore::new();
1732        let schema = Schema::new(vec![
1733            Field::unnamed(DataType::Timestamp),
1734            Field::unnamed(DataType::Int32),
1735            Field::unnamed(DataType::Float64),
1736        ]);
1737        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1738        let (mut tx, mut executor) =
1739            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1740
1741        tx.push_barrier(test_epoch(1), false);
1742        executor.next().await.unwrap().unwrap(); // Barrier
1743
1744        // Gap 00:00 -> 00:04 with LOCF fills at 00:01..=00:03.
1745        tx.push_chunk(StreamChunk::from_pretty(
1746            " TS                  i   F
1747            + 2022-01-01T00:00:00 1   1.0
1748            + 2022-01-01T00:04:00 4   4.0",
1749        ));
1750        executor.next().await.unwrap().unwrap(); // Initial fills.
1751
1752        // An off-grid anchor (00:01:30) is not on the prev grid, so the suffix re-grids onto the new
1753        // anchor (00:02:30, 00:03:30) — fully disjoint from the old 00:02/00:03 fills, no slot
1754        // collision. The 00:01 prefix fill is untouched (not in the chunk).
1755        tx.push_chunk(StreamChunk::from_pretty(
1756            " TS                  i   F
1757            + 2022-01-01T00:01:30 9   9.0",
1758        ));
1759        let chunk = next_chunk(&mut executor).await;
1760        assert_chunk_eq_ordered(
1761            chunk,
1762            StreamChunk::from_pretty(
1763                " TS                  i   F
1764                - 2022-01-01T00:02:00 1   1.0
1765                + 2022-01-01T00:02:30 9   9.0
1766                - 2022-01-01T00:03:00 1   1.0
1767                + 2022-01-01T00:03:30 9   9.0
1768                + 2022-01-01T00:01:30 9   9.0",
1769            ),
1770        );
1771    }
1772
1773    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1774    async fn test_streaming_gap_fill_month_interval_partial_overlap() {
1775        let store = MemoryStateStore::new();
1776        let schema = Schema::new(vec![
1777            Field::unnamed(DataType::Timestamp),
1778            Field::unnamed(DataType::Int32),
1779            Field::unnamed(DataType::Float64),
1780        ]);
1781        let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1782        let (mut tx, mut executor) = create_executor(
1783            store,
1784            fill_columns,
1785            schema,
1786            Interval::from_month_day_usec(1, 0, 0),
1787        )
1788        .await;
1789
1790        tx.push_barrier(test_epoch(1), false);
1791        executor.next().await.unwrap().unwrap(); // Barrier
1792
1793        // Month grid from 2023-12-30 clamps day 30/31 into Feb: fills at 2024-01-30, 2024-02-29.
1794        tx.push_chunk(StreamChunk::from_pretty(
1795            " TS                  i   F
1796            + 2023-12-30T00:00:00 1   1.0
1797            + 2024-03-01T00:00:00 9   9.0",
1798        ));
1799        let chunk = next_chunk(&mut executor).await;
1800        assert_eq!(
1801            chunk.sort_rows(),
1802            StreamChunk::from_pretty(
1803                " TS                  i   F
1804                + 2023-12-30T00:00:00 1   1.0
1805                + 2024-01-30T00:00:00 .   .
1806                + 2024-02-29T00:00:00 .   .
1807                + 2024-03-01T00:00:00 9   9.0"
1808            )
1809            .sort_rows()
1810        );
1811
1812        // Insert 2023-12-31: the new grid (01-31, 02-29) diverges from the old (01-30, 02-29) at the
1813        // head but converges at 02-29. Only 01-30 -> 01-31 changes; 02-29 must NOT be churned.
1814        tx.push_chunk(StreamChunk::from_pretty(
1815            " TS                  i   F
1816            + 2023-12-31T00:00:00 5   5.0",
1817        ));
1818        let chunk2 = next_chunk(&mut executor).await;
1819        assert_chunk_eq_ordered(
1820            chunk2,
1821            StreamChunk::from_pretty(
1822                " TS                  i   F
1823                - 2024-01-30T00:00:00 .   .
1824                + 2024-01-31T00:00:00 .   .
1825                + 2023-12-31T00:00:00 5   5.0",
1826            ),
1827        );
1828
1829        // Deleting it merges back to the old grid: 01-31 -> 01-30, 02-29 still untouched.
1830        tx.push_chunk(StreamChunk::from_pretty(
1831            " TS                  i   F
1832            - 2023-12-31T00:00:00 5   5.0",
1833        ));
1834        let chunk3 = next_chunk(&mut executor).await;
1835        assert_chunk_eq_ordered(
1836            chunk3,
1837            StreamChunk::from_pretty(
1838                " TS                  i   F
1839                - 2023-12-31T00:00:00 5   5.0
1840                + 2024-01-30T00:00:00 .   .
1841                - 2024-01-31T00:00:00 .   .",
1842            ),
1843        );
1844    }
1845}