risingwave_stream/executor/
gap_fill.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::Bound;
17
18use futures::{StreamExt, pin_mut};
19use risingwave_common::array::Op;
20use risingwave_common::gap_fill::{
21    FillStrategy, apply_interpolation_step, calculate_interpolation_step,
22};
23use risingwave_common::metrics::LabelGuardedIntCounter;
24use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
25use risingwave_common::types::{CheckedAdd, Datum, ScalarImpl, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::row_serde::OrderedRowSerde;
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_expr::expr::NonStrictExpression;
30use risingwave_storage::StateStore;
31use risingwave_storage::store::PrefetchOptions;
32use tracing::warn;
33
34use crate::common::table::state_table::{StateTable, StateTablePostCommit};
35use crate::executor::prelude::*;
36
37pub struct GapFillExecutorArgs<S: StateStore> {
38    pub ctx: ActorContextRef,
39    pub input: Executor,
40    pub schema: Schema,
41    pub chunk_size: usize,
42    pub time_column_index: usize,
43    pub fill_columns: HashMap<usize, FillStrategy>,
44    pub gap_interval: NonStrictExpression,
45    pub state_table: StateTable<S>,
46}
47
48/// Tracks if a row is original data or a filled value.
49#[derive(Debug, Clone, PartialEq)]
50pub enum RowType {
51    Original,
52    Filled,
53}
54
55/// Cache key for Gap Fill, derived from the time column.
56pub type GapFillCacheKey = Vec<u8>;
57
58/// Cache for storing Gap Fill rows.
59pub type GapFillCache = BTreeMap<GapFillCacheKey, (CompactedRow, RowType)>;
60
61const GAPFILL_CACHE_DEFAULT_CAPACITY: usize = 1024;
62
63pub struct ManagedGapFillState<S: StateStore> {
64    state_table: StateTable<S>,
65    time_key_serde: OrderedRowSerde,
66    time_column_index: usize,
67    filled_column_index: usize,
68}
69
70#[derive(Clone, PartialEq, Debug)]
71pub struct GapFillStateRow {
72    pub cache_key: GapFillCacheKey,
73    pub row: OwnedRow,
74    pub row_type: RowType,
75}
76
77impl GapFillStateRow {
78    pub fn new(cache_key: GapFillCacheKey, row: OwnedRow, row_type: RowType) -> Self {
79        Self {
80            cache_key,
81            row,
82            row_type,
83        }
84    }
85}
86
87impl<S: StateStore> ManagedGapFillState<S> {
88    pub fn new(state_table: StateTable<S>, time_column_index: usize, schema: &Schema) -> Self {
89        // Create serializer for time column only.
90        let time_column_type = schema[time_column_index].data_type();
91        let time_key_serde = OrderedRowSerde::new(
92            vec![time_column_type],
93            vec![risingwave_common::util::sort_util::OrderType::ascending()],
94        );
95
96        // The is_filled flag is always the last column in the state table schema.
97        let filled_column_index = schema.len();
98
99        Self {
100            state_table,
101            time_key_serde,
102            time_column_index,
103            filled_column_index,
104        }
105    }
106
107    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
108        self.state_table.init_epoch(epoch).await
109    }
110
111    pub fn insert(&mut self, value: impl Row) {
112        self.state_table.insert(value);
113    }
114
115    pub fn delete(&mut self, value: impl Row) {
116        self.state_table.delete(value);
117    }
118
119    /// Batch delete multiple rows from state table.
120    pub fn batch_delete(&mut self, rows: Vec<impl Row>) {
121        for row in rows {
122            self.state_table.delete(row);
123        }
124    }
125
126    /// Scans `StateStore` for filled rows between two time points (exclusive).
127    pub async fn scan_filled_rows_between(
128        &self,
129        start_time: &GapFillCacheKey,
130        end_time: &GapFillCacheKey,
131    ) -> StreamExecutorResult<Vec<(OwnedRow, OwnedRow)>> {
132        let mut filled_rows_to_delete = Vec::new();
133
134        let start_time_row = self.time_key_serde.deserialize(start_time)?;
135        let end_time_row = self.time_key_serde.deserialize(end_time)?;
136
137        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
138            Bound::Excluded(start_time_row),
139            Bound::Excluded(end_time_row),
140        );
141
142        let state_table_iter = self
143            .state_table
144            .iter_with_prefix(
145                &None::<row::Empty>,
146                sub_range,
147                PrefetchOptions::prefetch_for_large_range_scan(),
148            )
149            .await?;
150
151        pin_mut!(state_table_iter);
152        while let Some(row_result) = state_table_iter.next().await {
153            let row = row_result?.into_owned_row();
154
155            if self.extract_is_filled_flag(&row) {
156                // This is a filled row, add to deletion list.
157                // For state table, we use the time column as the key.
158                let time_datum = row.datum_at(self.time_column_index);
159                let state_key = OwnedRow::new(vec![time_datum.to_owned_datum()]);
160                filled_rows_to_delete.push((state_key, row));
161            }
162        }
163
164        Ok(filled_rows_to_delete)
165    }
166
167    /// Serialize time value to cache key.
168    pub fn serialize_time_to_cache_key(&self, row: impl Row) -> GapFillCacheKey {
169        row.project(&[self.time_column_index])
170            .memcmp_serialize(&self.time_key_serde)
171    }
172
173    /// Scans a range of data from `StateStore` to build a window of rows before `end_time`.
174    pub async fn scan_range_before(
175        &self,
176        end_time: &GapFillCacheKey,
177        limit: usize,
178    ) -> StreamExecutorResult<Vec<(GapFillCacheKey, CompactedRow, RowType)>> {
179        let end_time_row = self.time_key_serde.deserialize(end_time)?;
180
181        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
182            &(Bound::Unbounded, Bound::Excluded(end_time_row));
183
184        let state_table_iter = self
185            .state_table
186            .rev_iter_with_prefix(
187                &None::<row::Empty>,
188                sub_range,
189                PrefetchOptions::prefetch_for_large_range_scan(),
190            )
191            .await?;
192        pin_mut!(state_table_iter);
193
194        let mut results = Vec::new();
195
196        // Collect at most `limit` rows in reverse order.
197        while let Some(item) = state_table_iter.next().await {
198            let state_row = item?.into_owned_row();
199            let gapfill_row = self.get_gapfill_row(state_row);
200            results.push((
201                gapfill_row.cache_key,
202                (&gapfill_row.row).into(),
203                gapfill_row.row_type,
204            ));
205            if results.len() >= limit {
206                break;
207            }
208        }
209
210        results.reverse();
211
212        Ok(results)
213    }
214
215    /// Scans a range of data from `StateStore` to build a window of rows after `start_time`.
216    pub async fn scan_range_after(
217        &self,
218        start_time: &GapFillCacheKey,
219        limit: usize,
220    ) -> StreamExecutorResult<Vec<(GapFillCacheKey, CompactedRow, RowType)>> {
221        let start_time_row = self.time_key_serde.deserialize(start_time)?;
222
223        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
224            &(Bound::Excluded(start_time_row), Bound::Unbounded);
225
226        let state_table_iter = self
227            .state_table
228            .iter_with_prefix(
229                &None::<row::Empty>,
230                sub_range,
231                PrefetchOptions::prefetch_for_large_range_scan(),
232            )
233            .await?;
234        pin_mut!(state_table_iter);
235
236        let mut results = Vec::new();
237        let mut count = 0;
238
239        while let Some(item) = state_table_iter.next().await
240            && count < limit
241        {
242            let state_row = item?.into_owned_row();
243            let gapfill_row = self.get_gapfill_row(state_row);
244            results.push((
245                gapfill_row.cache_key,
246                (&gapfill_row.row).into(),
247                gapfill_row.row_type,
248            ));
249            count += 1;
250        }
251
252        Ok(results)
253    }
254
255    /// Converts a state table row to a `GapFillStateRow`.
256    fn get_gapfill_row(&self, state_row: OwnedRow) -> GapFillStateRow {
257        let row_type = self.extract_is_filled(&state_row);
258        let output_row = Self::state_row_to_output_row(&state_row);
259        let cache_key = self.serialize_time_to_cache_key(&output_row);
260
261        GapFillStateRow::new(cache_key, output_row, row_type)
262    }
263
264    pub async fn flush(
265        &mut self,
266        epoch: EpochPair,
267    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
268        self.state_table.commit(epoch).await
269    }
270
271    /// Convert a row to state table format by appending the `is_filled` flag.
272    fn row_to_state_row(row: &OwnedRow, is_filled: bool) -> OwnedRow {
273        let mut state_row_data = row.as_inner().to_vec();
274        state_row_data.push(Some(ScalarImpl::Bool(is_filled)));
275        OwnedRow::new(state_row_data)
276    }
277
278    /// Convert a state table row back to output format by removing the `is_filled` flag.
279    fn state_row_to_output_row(state_row: &OwnedRow) -> OwnedRow {
280        let mut output_data = state_row.as_inner().to_vec();
281        output_data.pop();
282        OwnedRow::new(output_data)
283    }
284
285    /// Extract the `is_filled` flag from a state table row and convert it to `RowType`.
286    fn extract_is_filled(&self, state_row: &OwnedRow) -> RowType {
287        let is_filled = matches!(
288            state_row
289                .datum_at(self.filled_column_index)
290                .and_then(|d| d.to_owned_datum()),
291            Some(ScalarImpl::Bool(true))
292        );
293        if is_filled {
294            RowType::Filled
295        } else {
296            RowType::Original
297        }
298    }
299
300    /// Extract the `is_filled` flag from a state table row.
301    fn extract_is_filled_flag(&self, state_row: &OwnedRow) -> bool {
302        matches!(
303            state_row
304                .datum_at(self.filled_column_index)
305                .and_then(|d| d.to_owned_datum()),
306            Some(ScalarImpl::Bool(true))
307        )
308    }
309}
310
311/// A cache for `GapFillExecutor` that stores a continuous time window of data.
312pub struct GapFillCacheManager {
313    cache: GapFillCache,
314    capacity: usize,
315
316    /// The time bounds of the cached window. `None` if the cache is empty.
317    window_bounds: Option<(GapFillCacheKey, GapFillCacheKey)>,
318}
319
320impl EstimateSize for GapFillCacheManager {
321    fn estimated_heap_size(&self) -> usize {
322        // Sum the estimated heap size of all entries in the cache.
323        self.cache
324            .iter()
325            .map(|(key, (row, _row_type))| {
326                key.estimated_heap_size()
327                    + row.estimated_heap_size()
328                    + std::mem::size_of::<RowType>()
329            })
330            .sum()
331    }
332}
333
334impl GapFillCacheManager {
335    pub fn new(capacity: usize) -> Self {
336        Self {
337            cache: GapFillCache::new(),
338            capacity,
339            window_bounds: None,
340        }
341    }
342
343    /// Check if the given time is within the current cache window bounds.
344    pub fn contains_time(&self, cache_key: &GapFillCacheKey) -> bool {
345        matches!(&self.window_bounds, Some((earliest, latest)) if cache_key >= earliest && cache_key <= latest)
346    }
347
348    /// Load a continuous window of data into the cache, replacing all existing data.
349    pub fn load_window(&mut self, window_data: Vec<(GapFillCacheKey, CompactedRow, RowType)>) {
350        self.cache.clear();
351        self.window_bounds = None;
352
353        if window_data.is_empty() {
354            return;
355        }
356
357        let mut earliest: Option<GapFillCacheKey> = None;
358        let mut latest: Option<GapFillCacheKey> = None;
359
360        for (cache_key, row, row_type) in window_data {
361            self.cache.insert(cache_key.clone(), (row, row_type));
362
363            if earliest.is_none() || &cache_key < earliest.as_ref().unwrap() {
364                earliest = Some(cache_key.clone());
365            }
366            if latest.is_none() || &cache_key > latest.as_ref().unwrap() {
367                latest = Some(cache_key.clone());
368            }
369        }
370
371        if let (Some(earliest), Some(latest)) = (earliest, latest) {
372            self.window_bounds = Some((earliest, latest));
373        }
374    }
375
376    /// Finds the closest previous original row, scanning through the `StateStore` if not in cache.
377    pub async fn find_prev_original<S: StateStore>(
378        &mut self,
379        target_time: &GapFillCacheKey,
380        managed_state: &ManagedGapFillState<S>,
381    ) -> StreamExecutorResult<Option<(GapFillCacheKey, CompactedRow)>> {
382        // Check current cache for a quick hit.
383        if self.contains_time(target_time)
384            && let Some(result) = self.find_prev_original_in_cache(target_time)
385        {
386            return Ok(Some(result));
387        }
388
389        // Cache miss or uncertain, start iterative window scanning.
390        let mut current_search_end_time = target_time.clone();
391
392        loop {
393            // Load a window of data before the current search end time.
394            let window_rows = managed_state
395                .scan_range_before(&current_search_end_time, self.capacity)
396                .await?;
397
398            let earliest_key = match window_rows.first() {
399                None => return Ok(None),
400                Some((key, _, _)) => key.clone(),
401            };
402            self.load_window(window_rows);
403            if let Some(result) = self.find_prev_original_in_cache(target_time) {
404                return Ok(Some(result));
405            }
406            current_search_end_time = earliest_key;
407        }
408    }
409
410    /// Finds the closest next original row, scanning through the `StateStore` if not in cache.
411    pub async fn find_next_original<S: StateStore>(
412        &mut self,
413        target_time: &GapFillCacheKey,
414        managed_state: &ManagedGapFillState<S>,
415    ) -> StreamExecutorResult<Option<(GapFillCacheKey, CompactedRow)>> {
416        // Check current cache for a quick hit.
417        if self.contains_time(target_time)
418            && let Some(result) = self.find_next_original_in_cache(target_time)
419        {
420            return Ok(Some(result));
421        }
422
423        // Cache miss or uncertain, start iterative window scanning.
424        let mut current_search_start_time = target_time.clone();
425
426        loop {
427            // Load a window of data after current_search_start_time.
428            let window_rows = managed_state
429                .scan_range_after(&current_search_start_time, self.capacity)
430                .await?;
431
432            let latest_key = match window_rows.last() {
433                None => return Ok(None),
434                Some((key, _, _)) => key.clone(),
435            };
436            self.load_window(window_rows);
437            if let Some(result) = self.find_next_original_in_cache(target_time) {
438                return Ok(Some(result));
439            }
440            current_search_start_time = latest_key;
441        }
442    }
443
444    /// Inserts a row into the cache, evicting the oldest row if capacity is exceeded.
445    pub fn insert(&mut self, cache_key: GapFillCacheKey, row: CompactedRow, row_type: RowType) {
446        self.cache.insert(cache_key.clone(), (row, row_type));
447
448        // Update window bounds to include the new key
449        self.update_window_bounds_for_insert(&cache_key);
450
451        while self.cache.len() > self.capacity {
452            if self.cache.pop_first().is_none() {
453                break;
454            }
455        }
456        self.update_window_bounds_after_removal();
457    }
458
459    pub fn remove(&mut self, cache_key: &GapFillCacheKey) -> Option<(CompactedRow, RowType)> {
460        let result = self.cache.remove(cache_key);
461        if result.is_some() {
462            self.update_window_bounds_after_removal();
463        }
464        result
465    }
466
467    /// Removes entries from the cache that are marked for deletion in the state table.
468    pub fn sync_clean_cache_entries(
469        &mut self,
470        state_rows_to_delete: &[(OwnedRow, OwnedRow)],
471        managed_state: &ManagedGapFillState<impl StateStore>,
472    ) {
473        let mut removed_any = false;
474        for (_state_key, data_row) in state_rows_to_delete {
475            let cache_key = managed_state.serialize_time_to_cache_key(data_row);
476            if self.cache.remove(&cache_key).is_some() {
477                removed_any = true;
478            }
479        }
480        if removed_any {
481            self.update_window_bounds_after_removal();
482        }
483    }
484
485    /// Finds the closest previous original row in the cache.
486    fn find_prev_original_in_cache(
487        &self,
488        cache_key: &GapFillCacheKey,
489    ) -> Option<(GapFillCacheKey, CompactedRow)> {
490        // Use window bounds to optimize: if the cache_key is before our window,
491        // we definitely won't find anything
492        if let Some((earliest, _)) = &self.window_bounds
493            && cache_key <= earliest
494        {
495            return None;
496        }
497
498        self.cache
499            .range::<GapFillCacheKey, _>(..cache_key)
500            .rev()
501            .find(|(_, (_, row_type))| *row_type == RowType::Original)
502            .map(|(key, (row, _))| (key.clone(), row.clone()))
503    }
504
505    /// Finds the closest next original row in the cache.
506    fn find_next_original_in_cache(
507        &self,
508        cache_key: &GapFillCacheKey,
509    ) -> Option<(GapFillCacheKey, CompactedRow)> {
510        // Use window bounds to optimize: if the cache_key is after our window,
511        // we definitely won't find anything
512        if let Some((_, latest)) = &self.window_bounds
513            && cache_key >= latest
514        {
515            return None;
516        }
517
518        self.cache
519            .range::<GapFillCacheKey, _>((Bound::Excluded(cache_key), Bound::Unbounded))
520            .find(|(_, (_, row_type))| *row_type == RowType::Original)
521            .map(|(key, (row, _))| (key.clone(), row.clone()))
522    }
523
524    /// Updates window bounds when a new key is inserted.
525    fn update_window_bounds_for_insert(&mut self, new_key: &GapFillCacheKey) {
526        match &mut self.window_bounds {
527            None => {
528                // Cache was empty, this is the first entry
529                self.window_bounds = Some((new_key.clone(), new_key.clone()));
530            }
531            Some((earliest, latest)) => {
532                // Update bounds if necessary
533                if new_key < earliest {
534                    *earliest = new_key.clone();
535                }
536                if new_key > latest {
537                    *latest = new_key.clone();
538                }
539            }
540        }
541    }
542
543    /// Updates window bounds after removing entries from the cache.
544    fn update_window_bounds_after_removal(&mut self) {
545        if self.cache.is_empty() {
546            self.window_bounds = None;
547        } else {
548            // Recalculate bounds from the current cache contents
549            let earliest = self.cache.keys().next().cloned();
550            let latest = self.cache.keys().next_back().cloned();
551            if let (Some(earliest), Some(latest)) = (earliest, latest) {
552                self.window_bounds = Some((earliest, latest));
553            }
554        }
555    }
556
557    /// Scans cache for filled rows between two time points (exclusive).
558    pub fn scan_filled_rows_between_in_cache<S: StateStore>(
559        &self,
560        start_time: &GapFillCacheKey,
561        end_time: &GapFillCacheKey,
562        managed_state: &ManagedGapFillState<S>,
563    ) -> StreamExecutorResult<(Vec<(OwnedRow, OwnedRow)>, bool)> {
564        let mut filled_rows_in_cache = Vec::new();
565
566        // Check if both start_time and end_time are within cache bounds
567        let range_fully_in_cache = self.contains_time(start_time) && self.contains_time(end_time);
568
569        if range_fully_in_cache {
570            for (_cache_key, (compacted_row, row_type)) in self.cache.range::<GapFillCacheKey, _>((
571                std::ops::Bound::Excluded(start_time),
572                std::ops::Bound::Excluded(end_time),
573            )) {
574                if *row_type == RowType::Filled {
575                    // Convert compacted row back to owned row
576                    let data_types = managed_state.state_table.get_data_types();
577                    let mut row_data_types = data_types.to_vec();
578                    // Remove the is_filled column type since cache stores original schema
579                    row_data_types.pop();
580
581                    let row = compacted_row.deserialize(&row_data_types)?;
582                    let time_datum = row.datum_at(managed_state.time_column_index);
583                    let state_key = OwnedRow::new(vec![time_datum.to_owned_datum()]);
584                    // Convert to state row format (with is_filled flag)
585                    let state_row = ManagedGapFillState::<S>::row_to_state_row(&row, true);
586                    filled_rows_in_cache.push((state_key, state_row));
587                }
588            }
589        }
590
591        Ok((filled_rows_in_cache, range_fully_in_cache))
592    }
593
594    pub async fn scan_filled_rows_between<S: StateStore>(
595        &self,
596        start_time: &GapFillCacheKey,
597        end_time: &GapFillCacheKey,
598        managed_state: &ManagedGapFillState<S>,
599    ) -> StreamExecutorResult<Vec<(OwnedRow, OwnedRow)>> {
600        // Try to scan filled rows from cache first
601        let (filled_rows_cache, range_fully_in_cache) =
602            self.scan_filled_rows_between_in_cache(start_time, end_time, managed_state)?;
603
604        if range_fully_in_cache {
605            // Cache covers the entire range, use cache results
606            Ok(filled_rows_cache)
607        } else {
608            // Cache doesn't cover the range, fall back to state table scan
609            managed_state
610                .scan_filled_rows_between(start_time, end_time)
611                .await
612        }
613    }
614}
615
616pub struct GapFillExecutor<S: StateStore> {
617    ctx: ActorContextRef,
618    input: Executor,
619    schema: Schema,
620    chunk_size: usize,
621    time_column_index: usize,
622    fill_columns: HashMap<usize, FillStrategy>,
623    gap_interval: NonStrictExpression,
624
625    // State management
626    managed_state: ManagedGapFillState<S>,
627    cache_manager: GapFillCacheManager,
628
629    // Metrics
630    metrics: GapFillMetrics,
631}
632
633pub struct GapFillMetrics {
634    pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
635}
636
637impl<S: StateStore> GapFillExecutor<S> {
638    pub fn new(args: GapFillExecutorArgs<S>) -> Self {
639        let managed_state =
640            ManagedGapFillState::new(args.state_table, args.time_column_index, &args.schema);
641        let cache_manager = GapFillCacheManager::new(GAPFILL_CACHE_DEFAULT_CAPACITY);
642
643        let metrics = args.ctx.streaming_metrics.clone();
644        let actor_id = args.ctx.id.to_string();
645        let fragment_id = args.ctx.fragment_id.to_string();
646        let gap_fill_metrics = GapFillMetrics {
647            gap_fill_generated_rows_count: metrics
648                .gap_fill_generated_rows_count
649                .with_guarded_label_values(&[&actor_id, &fragment_id]),
650        };
651
652        Self {
653            ctx: args.ctx,
654            input: args.input,
655            schema: args.schema,
656            chunk_size: args.chunk_size,
657            time_column_index: args.time_column_index,
658            fill_columns: args.fill_columns,
659            gap_interval: args.gap_interval,
660            managed_state,
661            cache_manager,
662            metrics: gap_fill_metrics,
663        }
664    }
665
666    /// Generates interpolated rows between two time points (`prev_row` and `curr_row`) using a static interval.
667    ///
668    /// # Parameters
669    /// - `prev_row`: Reference to the previous row (start of the gap).
670    /// - `curr_row`: Reference to the current row (end of the gap).
671    /// - `interval`: The interval to use for generating each filled row (typically a time interval).
672    /// - `time_column_index`: The index of the time column in the row, used to increment time values.
673    /// - `fill_columns`: A `HashMap` mapping column indices to their respective `FillStrategy`.
674    /// - `metrics`: Metrics for tracking the number of generated rows.
675    ///
676    /// # Fill Strategy Application
677    /// For each filled row, the function applies the specified `FillStrategy` for each column:
678    /// - `FillStrategy::Previous`: Uses the value from the previous row.
679    /// - `FillStrategy::Linear`: Interpolates linearly between the previous and current row values.
680    /// - Other strategies may be supported as defined in `FillStrategy`.
681    ///
682    /// Returns a vector of `OwnedRow` representing the filled rows between `prev_row` and `curr_row`.
683    fn generate_filled_rows_between_static(
684        prev_row: &OwnedRow,
685        curr_row: &OwnedRow,
686        interval: &risingwave_common::types::Interval,
687        time_column_index: usize,
688        fill_columns: &HashMap<usize, FillStrategy>,
689        metrics: &GapFillMetrics,
690    ) -> StreamExecutorResult<Vec<OwnedRow>> {
691        let mut filled_rows = Vec::new();
692
693        let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
694            prev_row.datum_at(time_column_index),
695            curr_row.datum_at(time_column_index),
696        ) else {
697            return Ok(filled_rows);
698        };
699
700        let prev_time = match prev_time_scalar {
701            ScalarRefImpl::Timestamp(ts) => ts,
702            ScalarRefImpl::Timestamptz(ts) => {
703                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
704                    Ok(timestamp) => timestamp,
705                    Err(_) => {
706                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
707                        return Ok(filled_rows);
708                    }
709                }
710            }
711            _ => {
712                warn!("Time column is not timestamp type: {:?}", prev_time_scalar);
713                return Ok(filled_rows);
714            }
715        };
716
717        let curr_time = match curr_time_scalar {
718            ScalarRefImpl::Timestamp(ts) => ts,
719            ScalarRefImpl::Timestamptz(ts) => {
720                match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
721                    Ok(timestamp) => timestamp,
722                    Err(_) => {
723                        warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
724                        return Ok(filled_rows);
725                    }
726                }
727            }
728            _ => {
729                warn!("Time column is not timestamp type: {:?}", curr_time_scalar);
730                return Ok(filled_rows);
731            }
732        };
733
734        if prev_time >= curr_time {
735            return Ok(filled_rows);
736        }
737
738        // Calculate the number of rows to be generated and validate
739        let mut fill_time = match prev_time.checked_add(*interval) {
740            Some(t) => t,
741            None => {
742                // If the interval is so large that adding it to prev_time causes overflow,
743                // it means we shouldn't do gap fill at all.
744                warn!(
745                    "Gap fill interval is too large, causing timestamp overflow. \
746                     No gap filling will be performed between {:?} and {:?}.",
747                    prev_time, curr_time
748                );
749                return Ok(filled_rows);
750            }
751        };
752
753        // Check if fill_time is already >= curr_time, which means no gap to fill
754        if fill_time >= curr_time {
755            return Ok(filled_rows);
756        }
757
758        // Count the number of rows to generate
759        let mut row_count = 0;
760        let mut temp_time = fill_time;
761        while temp_time < curr_time {
762            row_count += 1;
763            temp_time = match temp_time.checked_add(*interval) {
764                Some(t) => t,
765                None => break,
766            };
767        }
768
769        // Pre-compute interpolation steps for each column that requires interpolation
770        let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
771        let mut interpolation_states: Vec<Datum> = Vec::new();
772
773        for i in 0..prev_row.len() {
774            if let Some(strategy) = fill_columns.get(&i) {
775                if matches!(strategy, FillStrategy::Interpolate) {
776                    let step = calculate_interpolation_step(
777                        prev_row.datum_at(i),
778                        curr_row.datum_at(i),
779                        row_count + 1,
780                    );
781                    interpolation_steps.push(step.clone());
782                    interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
783                } else {
784                    interpolation_steps.push(None);
785                    interpolation_states.push(None);
786                }
787            } else {
788                interpolation_steps.push(None);
789                interpolation_states.push(None);
790            }
791        }
792
793        // Generate filled rows, applying the appropriate strategy for each column
794        while fill_time < curr_time {
795            let mut new_row_data = Vec::with_capacity(prev_row.len());
796
797            for col_idx in 0..prev_row.len() {
798                let datum = if col_idx == time_column_index {
799                    // Time column: use the incremented timestamp
800                    let fill_time_scalar = match prev_time_scalar {
801                        ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
802                        ScalarRefImpl::Timestamptz(_) => {
803                            let micros = fill_time.0.and_utc().timestamp_micros();
804                            ScalarImpl::Timestamptz(
805                                risingwave_common::types::Timestamptz::from_micros(micros),
806                            )
807                        }
808                        _ => unreachable!("Time column should be Timestamp or Timestamptz"),
809                    };
810                    Some(fill_time_scalar)
811                } else if let Some(strategy) = fill_columns.get(&col_idx) {
812                    // Apply the fill strategy for this column
813                    match strategy {
814                        FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
815                        FillStrategy::Null => None,
816                        FillStrategy::Interpolate => {
817                            // Apply interpolation step and update cumulative value
818                            if let Some(step) = &interpolation_steps[col_idx] {
819                                apply_interpolation_step(&mut interpolation_states[col_idx], step);
820                                interpolation_states[col_idx].clone()
821                            } else {
822                                // If interpolation step is None, fill with NULL
823                                None
824                            }
825                        }
826                    }
827                } else {
828                    // No strategy specified, default to NULL
829                    None
830                };
831                new_row_data.push(datum);
832            }
833
834            filled_rows.push(OwnedRow::new(new_row_data));
835
836            fill_time = match fill_time.checked_add(*interval) {
837                Some(t) => t,
838                None => {
839                    // Time overflow during iteration, stop filling
840                    warn!(
841                        "Gap fill stopped due to timestamp overflow after generating {} rows.",
842                        filled_rows.len()
843                    );
844                    break;
845                }
846            };
847        }
848
849        // Update metrics with the number of generated rows
850        metrics
851            .gap_fill_generated_rows_count
852            .inc_by(filled_rows.len() as u64);
853
854        Ok(filled_rows)
855    }
856}
857
858impl<S: StateStore> Execute for GapFillExecutor<S> {
859    fn execute(self: Box<Self>) -> BoxedMessageStream {
860        self.execute_inner().boxed()
861    }
862}
863
864impl<S: StateStore> GapFillExecutor<S> {
865    #[try_stream(ok = Message, error = StreamExecutorError)]
866    async fn execute_inner(self: Box<Self>) {
867        let Self {
868            mut managed_state,
869            mut cache_manager,
870            schema,
871            chunk_size,
872            time_column_index,
873            fill_columns,
874            gap_interval,
875            ctx,
876            input,
877            metrics,
878        } = *self;
879
880        let mut input = input.execute();
881
882        let barrier = expect_first_barrier(&mut input).await?;
883        let first_epoch = barrier.epoch;
884        yield Message::Barrier(barrier);
885        managed_state.init_epoch(first_epoch).await?;
886
887        // Start with an empty cache - use lazy loading strategy.
888        // Data will be loaded on-demand when gap filling operations require it.
889
890        // Calculate and validate gap interval once at initialization
891        let dummy_row = OwnedRow::new(vec![]);
892        let interval_datum = gap_interval.eval_row_infallible(&dummy_row).await;
893        let interval = interval_datum
894            .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
895            .into_interval();
896
897        // Validate that gap interval is not zero
898        if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
899            Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
900        }
901
902        #[for_await]
903        for msg in input {
904            match msg? {
905                Message::Chunk(chunk) => {
906                    let chunk = chunk.compact_vis();
907                    let mut chunk_builder =
908                        StreamChunkBuilder::new(chunk_size, schema.data_types());
909
910                    for (op, row_ref) in chunk.rows() {
911                        let row = row_ref.to_owned_row();
912
913                        match op {
914                            Op::Insert | Op::UpdateInsert => {
915                                let cache_key = managed_state.serialize_time_to_cache_key(&row);
916
917                                // Find previous and next original row neighbors.
918                                let prev_original = cache_manager
919                                    .find_prev_original(&cache_key, &managed_state)
920                                    .await?;
921                                let next_original = cache_manager
922                                    .find_next_original(&cache_key, &managed_state)
923                                    .await?;
924
925                                // If both neighbors exist, delete previously fill rows between them.
926                                if let (Some((prev_key, _)), Some((next_key, _))) =
927                                    (&prev_original, &next_original)
928                                {
929                                    let filled_rows_to_delete = cache_manager
930                                        .scan_filled_rows_between(
931                                            prev_key,
932                                            next_key,
933                                            &managed_state,
934                                        )
935                                        .await?;
936
937                                    cache_manager.sync_clean_cache_entries(
938                                        &filled_rows_to_delete,
939                                        &managed_state,
940                                    );
941
942                                    let mut state_rows_to_delete = Vec::new();
943                                    for (_state_key, state_row) in filled_rows_to_delete {
944                                        state_rows_to_delete.push(state_row.clone());
945
946                                        let output_row =
947                                            ManagedGapFillState::<S>::state_row_to_output_row(
948                                                &state_row,
949                                            );
950                                        if let Some(chunk) =
951                                            chunk_builder.append_row(Op::Delete, &output_row)
952                                        {
953                                            yield Message::Chunk(chunk);
954                                        }
955                                    }
956
957                                    managed_state.batch_delete(state_rows_to_delete);
958                                }
959
960                                // Insert new original row.
961                                let state_row =
962                                    ManagedGapFillState::<S>::row_to_state_row(&row, false);
963                                managed_state.insert(&state_row);
964                                cache_manager.insert(
965                                    cache_key.clone(),
966                                    (&row).into(),
967                                    RowType::Original,
968                                );
969
970                                if let Some(chunk) = chunk_builder.append_row(op, &row) {
971                                    yield Message::Chunk(chunk);
972                                }
973
974                                // Refill gaps adjacent to the new row.
975                                if let Some((_prev_key, prev_row_data)) = prev_original {
976                                    let prev_row =
977                                        prev_row_data.deserialize(&schema.data_types())?;
978                                    let filled_rows = Self::generate_filled_rows_between_static(
979                                        &prev_row,
980                                        &row,
981                                        &interval,
982                                        time_column_index,
983                                        &fill_columns,
984                                        &metrics,
985                                    )?;
986
987                                    for filled_row in filled_rows {
988                                        let fill_cache_key =
989                                            managed_state.serialize_time_to_cache_key(&filled_row);
990                                        let state_row = ManagedGapFillState::<S>::row_to_state_row(
991                                            &filled_row,
992                                            true,
993                                        );
994                                        managed_state.insert(&state_row);
995                                        cache_manager.insert(
996                                            fill_cache_key,
997                                            (&filled_row).into(),
998                                            RowType::Filled,
999                                        );
1000                                        if let Some(chunk) =
1001                                            chunk_builder.append_row(Op::Insert, &filled_row)
1002                                        {
1003                                            yield Message::Chunk(chunk);
1004                                        }
1005                                    }
1006                                }
1007
1008                                if let Some((_next_key, next_row_data)) = next_original {
1009                                    let next_row =
1010                                        next_row_data.deserialize(&schema.data_types())?;
1011                                    let filled_rows = Self::generate_filled_rows_between_static(
1012                                        &row,
1013                                        &next_row,
1014                                        &interval,
1015                                        time_column_index,
1016                                        &fill_columns,
1017                                        &metrics,
1018                                    )?;
1019
1020                                    for filled_row in filled_rows {
1021                                        let fill_cache_key =
1022                                            managed_state.serialize_time_to_cache_key(&filled_row);
1023                                        let state_row = ManagedGapFillState::<S>::row_to_state_row(
1024                                            &filled_row,
1025                                            true,
1026                                        );
1027                                        managed_state.insert(&state_row);
1028                                        cache_manager.insert(
1029                                            fill_cache_key,
1030                                            (&filled_row).into(),
1031                                            RowType::Filled,
1032                                        );
1033                                        if let Some(chunk) =
1034                                            chunk_builder.append_row(Op::Insert, &filled_row)
1035                                        {
1036                                            yield Message::Chunk(chunk);
1037                                        }
1038                                    }
1039                                }
1040                            }
1041                            Op::Delete | Op::UpdateDelete => {
1042                                let cache_key = managed_state.serialize_time_to_cache_key(&row);
1043
1044                                // Find previous and next original row neighbors before deletion.
1045                                let prev_original = cache_manager
1046                                    .find_prev_original(&cache_key, &managed_state)
1047                                    .await?;
1048                                let next_original = cache_manager
1049                                    .find_next_original(&cache_key, &managed_state)
1050                                    .await?;
1051
1052                                // Delete fill rows on both sides of the row to be deleted.
1053                                let mut filled_rows_to_delete = Vec::new();
1054
1055                                if let Some((prev_key, _)) = &prev_original {
1056                                    let fills_left = cache_manager
1057                                        .scan_filled_rows_between(
1058                                            prev_key,
1059                                            &cache_key,
1060                                            &managed_state,
1061                                        )
1062                                        .await?;
1063                                    filled_rows_to_delete.extend(fills_left);
1064                                }
1065
1066                                if let Some((next_key, _)) = &next_original {
1067                                    let fills_right = cache_manager
1068                                        .scan_filled_rows_between(
1069                                            &cache_key,
1070                                            next_key,
1071                                            &managed_state,
1072                                        )
1073                                        .await?;
1074                                    filled_rows_to_delete.extend(fills_right);
1075                                }
1076
1077                                cache_manager.sync_clean_cache_entries(
1078                                    &filled_rows_to_delete,
1079                                    &managed_state,
1080                                );
1081
1082                                let mut state_rows_to_delete = Vec::new();
1083                                for (_state_key, state_row) in filled_rows_to_delete {
1084                                    state_rows_to_delete.push(state_row.clone());
1085
1086                                    let output_row =
1087                                        ManagedGapFillState::<S>::state_row_to_output_row(
1088                                            &state_row,
1089                                        );
1090                                    if let Some(chunk) =
1091                                        chunk_builder.append_row(Op::Delete, &output_row)
1092                                    {
1093                                        yield Message::Chunk(chunk);
1094                                    }
1095                                }
1096                                managed_state.batch_delete(state_rows_to_delete);
1097
1098                                // Delete the original row.
1099                                let state_row =
1100                                    ManagedGapFillState::<S>::row_to_state_row(&row, false);
1101                                managed_state.delete(&state_row);
1102                                cache_manager.remove(&cache_key);
1103                                if let Some(chunk) = chunk_builder.append_row(op, &row) {
1104                                    yield Message::Chunk(chunk);
1105                                }
1106
1107                                // If both neighbors exist, refill the gap between them.
1108                                if let (Some((_, prev_row_data)), Some((_, next_row_data))) =
1109                                    (prev_original, next_original)
1110                                {
1111                                    let prev_row =
1112                                        prev_row_data.deserialize(&schema.data_types())?;
1113                                    let next_row =
1114                                        next_row_data.deserialize(&schema.data_types())?;
1115                                    let filled_rows = Self::generate_filled_rows_between_static(
1116                                        &prev_row,
1117                                        &next_row,
1118                                        &interval,
1119                                        time_column_index,
1120                                        &fill_columns,
1121                                        &metrics,
1122                                    )?;
1123
1124                                    for filled_row in filled_rows {
1125                                        let fill_cache_key =
1126                                            managed_state.serialize_time_to_cache_key(&filled_row);
1127                                        let state_row = ManagedGapFillState::<S>::row_to_state_row(
1128                                            &filled_row,
1129                                            true,
1130                                        );
1131                                        managed_state.insert(&state_row);
1132                                        cache_manager.insert(
1133                                            fill_cache_key,
1134                                            (&filled_row).into(),
1135                                            RowType::Filled,
1136                                        );
1137                                        if let Some(chunk) =
1138                                            chunk_builder.append_row(Op::Insert, &filled_row)
1139                                        {
1140                                            yield Message::Chunk(chunk);
1141                                        }
1142                                    }
1143                                }
1144                            }
1145                        }
1146                    }
1147
1148                    if let Some(chunk) = chunk_builder.take() {
1149                        yield Message::Chunk(chunk);
1150                    }
1151                }
1152                Message::Watermark(watermark) => {
1153                    yield Message::Watermark(watermark);
1154                }
1155                Message::Barrier(barrier) => {
1156                    let post_commit = managed_state.flush(barrier.epoch).await?;
1157                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(ctx.id);
1158                    yield Message::Barrier(barrier);
1159                    post_commit.post_yield_barrier(update_vnode_bitmap).await?;
1160                }
1161            }
1162        }
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use itertools::Itertools;
1169    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1170    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1171    use risingwave_common::types::test_utils::IntervalTestExt;
1172    use risingwave_common::types::{DataType, Interval};
1173    use risingwave_common::util::epoch::test_epoch;
1174    use risingwave_common::util::sort_util::OrderType;
1175    use risingwave_expr::expr::LiteralExpression;
1176    use risingwave_storage::memory::MemoryStateStore;
1177
1178    use super::*;
1179    use crate::common::table::state_table::StateTable;
1180    use crate::common::table::test_utils::gen_pbtable_with_dist_key;
1181    use crate::executor::test_utils::{MessageSender, MockSource};
1182
1183    async fn create_executor(
1184        store: MemoryStateStore,
1185        fill_columns: HashMap<usize, FillStrategy>,
1186        schema: Schema,
1187        gap_interval: Interval,
1188    ) -> (MessageSender, BoxedMessageStream) {
1189        let (tx, source) = MockSource::channel();
1190        let source = source.into_executor(schema.clone(), vec![0]);
1191
1192        let mut table_columns: Vec<ColumnDesc> = schema
1193            .fields
1194            .iter()
1195            .enumerate()
1196            .map(|(i, f)| ColumnDesc::unnamed(ColumnId::new(i as i32), f.data_type.clone()))
1197            .collect();
1198
1199        // Add the is_filled flag column at the end.
1200        let is_filled_column_id = table_columns.len() as i32;
1201        table_columns.push(ColumnDesc::unnamed(
1202            ColumnId::new(is_filled_column_id),
1203            DataType::Boolean,
1204        ));
1205
1206        let table = StateTable::from_table_catalog(
1207            &gen_pbtable_with_dist_key(
1208                TableId::new(0),
1209                table_columns,
1210                vec![OrderType::ascending()],
1211                vec![0],
1212                0,
1213                vec![],
1214            ),
1215            store,
1216            None,
1217        )
1218        .await;
1219
1220        let time_column_index = 0;
1221
1222        let executor = GapFillExecutor::new(GapFillExecutorArgs {
1223            ctx: ActorContext::for_test(123),
1224            input: source,
1225            schema: schema.clone(),
1226            chunk_size: 1024,
1227            time_column_index,
1228            fill_columns,
1229            gap_interval: NonStrictExpression::for_test(LiteralExpression::new(
1230                DataType::Interval,
1231                Some(gap_interval.into()),
1232            )),
1233            state_table: table,
1234        });
1235
1236        (tx, executor.boxed().execute())
1237    }
1238
1239    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1240    async fn test_streaming_gap_fill_locf() {
1241        let store = MemoryStateStore::new();
1242        let schema = Schema::new(vec![
1243            Field::unnamed(DataType::Timestamp),
1244            Field::unnamed(DataType::Int32),
1245            Field::unnamed(DataType::Float64),
1246        ]);
1247        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1248        let (mut tx, mut executor) =
1249            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1250
1251        // Init with barrier.
1252        tx.push_barrier(test_epoch(1), false);
1253        executor.next().await.unwrap().unwrap(); // Barrier
1254
1255        // 1. Send an initial chunk with a gap to test basic filling.
1256        tx.push_chunk(StreamChunk::from_pretty(
1257            " TS                  i   F
1258            + 2022-01-01T00:00:00 1   1.0
1259            + 2022-01-01T00:03:00 4   4.0",
1260        ));
1261
1262        let chunk = executor
1263            .next()
1264            .await
1265            .unwrap()
1266            .unwrap()
1267            .into_chunk()
1268            .unwrap();
1269        let expected = StreamChunk::from_pretty(
1270            " TS                  i   F
1271            + 2022-01-01T00:00:00 1   1.0
1272            + 2022-01-01T00:03:00 4   4.0
1273            + 2022-01-01T00:01:00 1   1.0
1274            + 2022-01-01T00:02:00 1   1.0",
1275        );
1276
1277        // Simple comparison since the test utility assumes Int64 keys.
1278        assert_eq!(chunk.ops(), expected.ops());
1279        assert_eq!(chunk.visibility(), expected.visibility());
1280
1281        // Compare each row individually.
1282        let chunk_rows: Vec<_> = chunk.rows().collect();
1283        let expected_rows: Vec<_> = expected.rows().collect();
1284        assert_eq!(chunk_rows.len(), expected_rows.len());
1285
1286        for (i, ((op1, row1), (op2, row2))) in
1287            chunk_rows.iter().zip_eq(expected_rows.iter()).enumerate()
1288        {
1289            assert_eq!(op1, op2, "Row {} operation mismatch", i);
1290            assert_eq!(
1291                row1.to_owned_row(),
1292                row2.to_owned_row(),
1293                "Row {} data mismatch",
1294                i
1295            );
1296        }
1297
1298        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1299        // This tests if the executor can correctly retract old filled rows and create new ones.
1300        tx.push_chunk(StreamChunk::from_pretty(
1301            " TS                  i   F
1302            + 2022-01-01T00:02:00 2   2.0",
1303        ));
1304
1305        // Expect a chunk that retracts the old fills, inserts the new row, and adds the new fills.
1306        let chunk2 = executor
1307            .next()
1308            .await
1309            .unwrap()
1310            .unwrap()
1311            .into_chunk()
1312            .unwrap();
1313
1314        let expected2 = StreamChunk::from_pretty(
1315            " TS                  i   F
1316                - 2022-01-01T00:01:00 1   1.0
1317                - 2022-01-01T00:02:00 1   1.0
1318                + 2022-01-01T00:01:00 1   1.0
1319                + 2022-01-01T00:02:00 2   2.0",
1320        );
1321
1322        assert_eq!(chunk2.sort_rows(), expected2.sort_rows());
1323
1324        // 3. Send a delete chunk to remove an original data point.
1325        // This should trigger retraction of old fills and generation of new ones.
1326        tx.push_chunk(StreamChunk::from_pretty(
1327            " TS                  i   F
1328            - 2022-01-01T00:02:00 2   2.0",
1329        ));
1330
1331        let chunk3 = executor
1332            .next()
1333            .await
1334            .unwrap()
1335            .unwrap()
1336            .into_chunk()
1337            .unwrap();
1338        assert_eq!(
1339            chunk3.sort_rows(),
1340            StreamChunk::from_pretty(
1341                " TS                  i   F
1342                - 2022-01-01T00:01:00 1   1.0
1343                - 2022-01-01T00:02:00 2   2.0
1344                + 2022-01-01T00:01:00 1   1.0
1345                + 2022-01-01T00:02:00 1   1.0"
1346            )
1347            .sort_rows()
1348        );
1349
1350        // 4. Send an update chunk to modify an original data point.
1351        // This should also trigger retraction and re-generation of fills.
1352        tx.push_chunk(StreamChunk::from_pretty(
1353            " TS                  i   F
1354            U- 2022-01-01T00:03:00 4   4.0
1355            U+ 2022-01-01T00:03:00 5   5.0",
1356        ));
1357
1358        let chunk4 = executor
1359            .next()
1360            .await
1361            .unwrap()
1362            .unwrap()
1363            .into_chunk()
1364            .unwrap();
1365        // The filled rows' values don't change as they depend on the first row,
1366        // but they are still retracted and re-inserted due to the general path logic.
1367        assert_eq!(
1368            chunk4.sort_rows(),
1369            StreamChunk::from_pretty(
1370                " TS                  i   F
1371                - 2022-01-01T00:01:00 1   1.0
1372                - 2022-01-01T00:02:00 1   1.0
1373                U- 2022-01-01T00:03:00 4   4.0
1374                + 2022-01-01T00:01:00 1   1.0
1375                + 2022-01-01T00:02:00 1   1.0
1376                U+ 2022-01-01T00:03:00 5   5.0"
1377            )
1378            .sort_rows()
1379        );
1380    }
1381
1382    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1383    async fn test_streaming_gap_fill_null() {
1384        let store = MemoryStateStore::new();
1385        let schema = Schema::new(vec![
1386            Field::unnamed(DataType::Timestamp),
1387            Field::unnamed(DataType::Int32),
1388            Field::unnamed(DataType::Float64),
1389        ]);
1390        let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1391        let (mut tx, mut executor) =
1392            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1393
1394        // Init with barrier.
1395        tx.push_barrier(test_epoch(1), false);
1396        executor.next().await.unwrap().unwrap(); // Barrier
1397
1398        // 1. Send an initial chunk with a gap to test basic filling.
1399        tx.push_chunk(StreamChunk::from_pretty(
1400            " TS                  i   F
1401            + 2022-01-01T00:00:00 1   1.0
1402            + 2022-01-01T00:03:00 4   4.0",
1403        ));
1404
1405        let chunk = executor
1406            .next()
1407            .await
1408            .unwrap()
1409            .unwrap()
1410            .into_chunk()
1411            .unwrap();
1412        assert_eq!(
1413            chunk.sort_rows(),
1414            StreamChunk::from_pretty(
1415                " TS                  i   F
1416                + 2022-01-01T00:00:00 1   1.0
1417                + 2022-01-01T00:01:00 .   .
1418                + 2022-01-01T00:02:00 .   .
1419                + 2022-01-01T00:03:00 4   4.0"
1420            )
1421            .sort_rows()
1422        );
1423
1424        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1425        tx.push_chunk(StreamChunk::from_pretty(
1426            " TS                  i   F
1427            + 2022-01-01T00:02:00 2   2.0",
1428        ));
1429
1430        let chunk2 = executor
1431            .next()
1432            .await
1433            .unwrap()
1434            .unwrap()
1435            .into_chunk()
1436            .unwrap();
1437        assert_eq!(
1438            chunk2.sort_rows(),
1439            StreamChunk::from_pretty(
1440                " TS                  i   F
1441                - 2022-01-01T00:01:00 .   .
1442                - 2022-01-01T00:02:00 .   .
1443                + 2022-01-01T00:01:00 .   .
1444                + 2022-01-01T00:02:00 2   2.0"
1445            )
1446            .sort_rows()
1447        );
1448
1449        // 3. Send a delete chunk to remove an original data point.
1450        tx.push_chunk(StreamChunk::from_pretty(
1451            " TS                  i   F
1452            - 2022-01-01T00:02:00 2   2.0",
1453        ));
1454
1455        let chunk3 = executor
1456            .next()
1457            .await
1458            .unwrap()
1459            .unwrap()
1460            .into_chunk()
1461            .unwrap();
1462        assert_eq!(
1463            chunk3.sort_rows(),
1464            StreamChunk::from_pretty(
1465                " TS                  i   F
1466                - 2022-01-01T00:01:00 .   .
1467                - 2022-01-01T00:02:00 2   2.0
1468                + 2022-01-01T00:01:00 .   .
1469                + 2022-01-01T00:02:00 .   ."
1470            )
1471            .sort_rows()
1472        );
1473
1474        // 4. Send an update chunk to modify an original data point.
1475        tx.push_chunk(StreamChunk::from_pretty(
1476            " TS                  i   F
1477            U- 2022-01-01T00:03:00 4   4.0
1478            U+ 2022-01-01T00:03:00 5   5.0",
1479        ));
1480
1481        let chunk4 = executor
1482            .next()
1483            .await
1484            .unwrap()
1485            .unwrap()
1486            .into_chunk()
1487            .unwrap();
1488        assert_eq!(
1489            chunk4.sort_rows(),
1490            StreamChunk::from_pretty(
1491                " TS                  i   F
1492                - 2022-01-01T00:01:00 .   .
1493                - 2022-01-01T00:02:00 .   .
1494                U- 2022-01-01T00:03:00 4   4.0
1495                + 2022-01-01T00:01:00 .   .
1496                + 2022-01-01T00:02:00 .   .
1497                U+ 2022-01-01T00:03:00 5   5.0"
1498            )
1499            .sort_rows()
1500        );
1501    }
1502
1503    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1504    async fn test_streaming_gap_fill_interpolate() {
1505        let store = MemoryStateStore::new();
1506        let schema = Schema::new(vec![
1507            Field::unnamed(DataType::Timestamp),
1508            Field::unnamed(DataType::Int32),
1509            Field::unnamed(DataType::Float64),
1510        ]);
1511        let fill_columns = HashMap::from([
1512            (1, FillStrategy::Interpolate),
1513            (2, FillStrategy::Interpolate),
1514        ]);
1515        let (mut tx, mut executor) =
1516            create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1517
1518        // Init with barrier.
1519        tx.push_barrier(test_epoch(1), false);
1520        executor.next().await.unwrap().unwrap(); // Barrier
1521
1522        // 1. Send an initial chunk with a gap to test basic filling.
1523        tx.push_chunk(StreamChunk::from_pretty(
1524            " TS                  i   F
1525            + 2022-01-01T00:00:00 1   1.0
1526            + 2022-01-01T00:03:00 4   4.0",
1527        ));
1528
1529        let chunk = executor
1530            .next()
1531            .await
1532            .unwrap()
1533            .unwrap()
1534            .into_chunk()
1535            .unwrap();
1536        assert_eq!(
1537            chunk.sort_rows(),
1538            StreamChunk::from_pretty(
1539                " TS                  i   F
1540                + 2022-01-01T00:00:00 1   1.0
1541                + 2022-01-01T00:01:00 2   2.0
1542                + 2022-01-01T00:02:00 3   3.0
1543                + 2022-01-01T00:03:00 4   4.0"
1544            )
1545            .sort_rows()
1546        );
1547
1548        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1549        tx.push_chunk(StreamChunk::from_pretty(
1550            " TS                  i   F
1551            + 2022-01-01T00:02:00 10  10.0",
1552        ));
1553
1554        let chunk2 = executor
1555            .next()
1556            .await
1557            .unwrap()
1558            .unwrap()
1559            .into_chunk()
1560            .unwrap();
1561        assert_eq!(
1562            chunk2.sort_rows(),
1563            StreamChunk::from_pretty(
1564                " TS                  i   F
1565                - 2022-01-01T00:01:00 2   2.0
1566                - 2022-01-01T00:02:00 3   3.0
1567                + 2022-01-01T00:01:00 5   5.5
1568                + 2022-01-01T00:02:00 10  10.0"
1569            )
1570            .sort_rows()
1571        );
1572
1573        // 3. Send a delete chunk to remove an original data point.
1574        // This should trigger retraction of old fills and re-calculation of interpolated values.
1575        tx.push_chunk(StreamChunk::from_pretty(
1576            " TS                  i   F
1577            - 2022-01-01T00:02:00 10  10.0",
1578        ));
1579
1580        let chunk3 = executor
1581            .next()
1582            .await
1583            .unwrap()
1584            .unwrap()
1585            .into_chunk()
1586            .unwrap();
1587        assert_eq!(
1588            chunk3.sort_rows(),
1589            StreamChunk::from_pretty(
1590                " TS                  i   F
1591                - 2022-01-01T00:01:00 5   5.5
1592                - 2022-01-01T00:02:00 10  10.0
1593                + 2022-01-01T00:01:00 2   2.0
1594                + 2022-01-01T00:02:00 3   3.0"
1595            )
1596            .sort_rows()
1597        );
1598
1599        // 4. Send an update chunk to modify an original data point.
1600        // This will cause the interpolated values to be re-calculated.
1601        tx.push_chunk(StreamChunk::from_pretty(
1602            " TS                  i   F
1603            U- 2022-01-01T00:03:00 4   4.0
1604            U+ 2022-01-01T00:03:00 10  10.0",
1605        ));
1606
1607        let chunk4 = executor
1608            .next()
1609            .await
1610            .unwrap()
1611            .unwrap()
1612            .into_chunk()
1613            .unwrap();
1614        assert_eq!(
1615            chunk4.sort_rows(),
1616            StreamChunk::from_pretty(
1617                " TS                  i   F
1618                - 2022-01-01T00:01:00 2   2.0
1619                - 2022-01-01T00:02:00 3   3.0
1620                U- 2022-01-01T00:03:00 4   4.0
1621                + 2022-01-01T00:01:00 4   4.0
1622                + 2022-01-01T00:02:00 7   7.0
1623                U+ 2022-01-01T00:03:00 10  10.0"
1624            )
1625            .sort_rows()
1626        );
1627    }
1628
1629    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1630    async fn test_streaming_gap_fill_recovery() {
1631        let store = MemoryStateStore::new();
1632        let schema = Schema::new(vec![
1633            Field::unnamed(DataType::Timestamp),
1634            Field::unnamed(DataType::Int32),
1635            Field::unnamed(DataType::Float64),
1636        ]);
1637        let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Interpolate)]);
1638
1639        // --- First run ---
1640        let (mut tx, mut executor) = create_executor(
1641            store.clone(),
1642            fill_columns.clone(),
1643            schema.clone(),
1644            Interval::from_minutes(1),
1645        )
1646        .await;
1647
1648        // Init with barrier.
1649        tx.push_barrier(test_epoch(1), false);
1650        executor.next().await.unwrap().unwrap(); // Barrier
1651
1652        // Send a chunk and commit.
1653        tx.push_chunk(StreamChunk::from_pretty(
1654            " TS                  i   F
1655            + 2022-01-01T00:00:00 1   1.0
1656            + 2022-01-01T00:03:00 4   4.0",
1657        ));
1658
1659        // Consume the initial filled chunk.
1660        let chunk = executor
1661            .next()
1662            .await
1663            .unwrap()
1664            .unwrap()
1665            .into_chunk()
1666            .unwrap();
1667        assert_eq!(
1668            chunk.sort_rows(),
1669            StreamChunk::from_pretty(
1670                " TS                  i   F
1671                + 2022-01-01T00:00:00 1   1.0
1672                + 2022-01-01T00:01:00 1   2.0
1673                + 2022-01-01T00:02:00 1   3.0
1674                + 2022-01-01T00:03:00 4   4.0"
1675            )
1676            .sort_rows()
1677        );
1678
1679        tx.push_barrier(test_epoch(2), false);
1680        executor.next().await.unwrap().unwrap(); // Barrier to commit.
1681
1682        // --- Second run (after recovery) ---
1683        let (mut tx2, mut executor2) = create_executor(
1684            store.clone(),
1685            fill_columns.clone(),
1686            schema.clone(),
1687            Interval::from_minutes(1),
1688        )
1689        .await;
1690
1691        // Init with barrier, which triggers recovery.
1692        tx2.push_barrier(test_epoch(2), false);
1693        executor2.next().await.unwrap().unwrap(); // Barrier
1694
1695        // After recovery, the executor should not output anything for the loaded state.
1696
1697        // Send a new chunk, which should fill the gap between old and new data.
1698        tx2.push_chunk(StreamChunk::from_pretty(
1699            " TS                  i   F
1700            + 2022-01-01T00:05:00 6   10.0",
1701        ));
1702
1703        let chunk2 = executor2
1704            .next()
1705            .await
1706            .unwrap()
1707            .unwrap()
1708            .into_chunk()
1709            .unwrap();
1710        assert_eq!(
1711            chunk2.sort_rows(),
1712            StreamChunk::from_pretty(
1713                " TS                  i   F
1714                + 2022-01-01T00:04:00 4   7.0
1715                + 2022-01-01T00:05:00 6   10.0"
1716            )
1717            .sort_rows()
1718        );
1719    }
1720
1721    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1722    async fn test_streaming_gap_fill_mixed_strategy() {
1723        let store = MemoryStateStore::new();
1724        let schema = Schema::new(vec![
1725            Field::unnamed(DataType::Timestamp),
1726            Field::unnamed(DataType::Int32),
1727            Field::unnamed(DataType::Int64),
1728            Field::unnamed(DataType::Float32),
1729            Field::unnamed(DataType::Float64),
1730        ]);
1731
1732        let fill_columns = HashMap::from([
1733            (1, FillStrategy::Interpolate),
1734            (2, FillStrategy::Locf),
1735            (3, FillStrategy::Null),
1736            (4, FillStrategy::Interpolate),
1737        ]);
1738        let gap_interval = Interval::from_days(1);
1739        let (mut tx, mut executor) =
1740            create_executor(store, fill_columns, schema, gap_interval).await;
1741
1742        // Init with barrier.
1743        tx.push_barrier(test_epoch(1), false);
1744        executor.next().await.unwrap().unwrap();
1745
1746        // Send an initial chunk with a gap to test mixed filling strategies.
1747        tx.push_chunk(StreamChunk::from_pretty(
1748            " TS                  i   I    f     F
1749            + 2023-04-01T10:00:00 10 100 1.0 100.0
1750            + 2023-04-05T10:00:00 50 200 5.0 200.0",
1751        ));
1752
1753        let chunk = executor
1754            .next()
1755            .await
1756            .unwrap()
1757            .unwrap()
1758            .into_chunk()
1759            .unwrap();
1760        assert_eq!(
1761            chunk.sort_rows(),
1762            StreamChunk::from_pretty(
1763                " TS                  i   I    f     F
1764                + 2023-04-01T10:00:00 10 100 1.0 100.0
1765                + 2023-04-02T10:00:00 20 100 .    125.0
1766                + 2023-04-03T10:00:00 30 100 .    150.0
1767                + 2023-04-04T10:00:00 40 100 .    175.0
1768                + 2023-04-05T10:00:00 50 200 5.0 200.0"
1769            )
1770            .sort_rows()
1771        );
1772
1773        // 2. Send a new chunk that arrives out-of-order, landing in the previously filled gap.
1774        tx.push_chunk(StreamChunk::from_pretty(
1775            " TS                  i   I    f     F
1776            + 2023-04-03T10:00:00 25 150 3.0 160.0",
1777        ));
1778
1779        let chunk2 = executor
1780            .next()
1781            .await
1782            .unwrap()
1783            .unwrap()
1784            .into_chunk()
1785            .unwrap();
1786        assert_eq!(
1787            chunk2.sort_rows(),
1788            StreamChunk::from_pretty(
1789                " TS                  i   I    f     F
1790                - 2023-04-02T10:00:00 20 100 .    125.0
1791                - 2023-04-03T10:00:00 30 100 .    150.0
1792                - 2023-04-04T10:00:00 40 100 .    175.0
1793                + 2023-04-02T10:00:00 17 100 .    130.0
1794                + 2023-04-03T10:00:00 25 150 3.0 160.0
1795                + 2023-04-04T10:00:00 37 150 .    180.0"
1796            )
1797            .sort_rows()
1798        );
1799
1800        // 3. Send a delete chunk to remove an original data point.
1801        tx.push_chunk(StreamChunk::from_pretty(
1802            " TS                  i   I    f     F
1803            - 2023-04-03T10:00:00 25 150 3.0 160.0",
1804        ));
1805        let chunk3 = executor
1806            .next()
1807            .await
1808            .unwrap()
1809            .unwrap()
1810            .into_chunk()
1811            .unwrap();
1812        assert_eq!(
1813            chunk3.sort_rows(),
1814            StreamChunk::from_pretty(
1815                " TS                  i   I    f     F
1816                - 2023-04-02T10:00:00 17 100 .    130.0
1817                - 2023-04-03T10:00:00 25 150 3.0 160.0
1818                - 2023-04-04T10:00:00 37 150 .    180.0
1819                + 2023-04-02T10:00:00 20 100 .    125.0
1820                + 2023-04-03T10:00:00 30 100 .    150.0
1821                + 2023-04-04T10:00:00 40 100 .    175.0"
1822            )
1823            .sort_rows()
1824        );
1825
1826        // 4. Send an update chunk to modify an original data point.
1827        tx.push_chunk(StreamChunk::from_pretty(
1828            " TS                  i   I    f     F
1829            U- 2023-04-05T10:00:00 50 200 5.0 200.0
1830            U+ 2023-04-05T10:00:00 50 200 5.0 300.0",
1831        ));
1832        let chunk4 = executor
1833            .next()
1834            .await
1835            .unwrap()
1836            .unwrap()
1837            .into_chunk()
1838            .unwrap();
1839        assert_eq!(
1840            chunk4.sort_rows(),
1841            StreamChunk::from_pretty(
1842                " TS                  i   I    f     F
1843                - 2023-04-02T10:00:00 20 100 .    125.0
1844                - 2023-04-03T10:00:00 30 100 .    150.0
1845                - 2023-04-04T10:00:00 40 100 .    175.0
1846                U- 2023-04-05T10:00:00 50 200 5.0 200.0
1847                + 2023-04-02T10:00:00 20 100 .    150.0
1848                + 2023-04-03T10:00:00 30 100 .    200.0
1849                + 2023-04-04T10:00:00 40 100 .    250.0
1850                U+ 2023-04-05T10:00:00 50 200 5.0 300.0"
1851            )
1852            .sort_rows()
1853        );
1854    }
1855}