1use std::collections::{BTreeMap, HashMap};
16use std::ops::Bound;
17
18use futures::{StreamExt, pin_mut};
19use risingwave_common::array::Op;
20use risingwave_common::gap_fill::{
21 FillStrategy, apply_interpolation_step, calculate_interpolation_step,
22};
23use risingwave_common::metrics::LabelGuardedIntCounter;
24use risingwave_common::row::{self, CompactedRow, OwnedRow, Row, RowExt};
25use risingwave_common::types::{CheckedAdd, Datum, ScalarImpl, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::row_serde::OrderedRowSerde;
28use risingwave_common_estimate_size::EstimateSize;
29use risingwave_expr::expr::NonStrictExpression;
30use risingwave_storage::StateStore;
31use risingwave_storage::store::PrefetchOptions;
32use tracing::warn;
33
34use crate::common::table::state_table::{StateTable, StateTablePostCommit};
35use crate::executor::prelude::*;
36
37pub struct GapFillExecutorArgs<S: StateStore> {
38 pub ctx: ActorContextRef,
39 pub input: Executor,
40 pub schema: Schema,
41 pub chunk_size: usize,
42 pub time_column_index: usize,
43 pub fill_columns: HashMap<usize, FillStrategy>,
44 pub gap_interval: NonStrictExpression,
45 pub state_table: StateTable<S>,
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub enum RowType {
51 Original,
52 Filled,
53}
54
55pub type GapFillCacheKey = Vec<u8>;
57
58pub type GapFillCache = BTreeMap<GapFillCacheKey, (CompactedRow, RowType)>;
60
61const GAPFILL_CACHE_DEFAULT_CAPACITY: usize = 1024;
62
63pub struct ManagedGapFillState<S: StateStore> {
64 state_table: StateTable<S>,
65 time_key_serde: OrderedRowSerde,
66 time_column_index: usize,
67 filled_column_index: usize,
68}
69
70#[derive(Clone, PartialEq, Debug)]
71pub struct GapFillStateRow {
72 pub cache_key: GapFillCacheKey,
73 pub row: OwnedRow,
74 pub row_type: RowType,
75}
76
77impl GapFillStateRow {
78 pub fn new(cache_key: GapFillCacheKey, row: OwnedRow, row_type: RowType) -> Self {
79 Self {
80 cache_key,
81 row,
82 row_type,
83 }
84 }
85}
86
87impl<S: StateStore> ManagedGapFillState<S> {
88 pub fn new(state_table: StateTable<S>, time_column_index: usize, schema: &Schema) -> Self {
89 let time_column_type = schema[time_column_index].data_type();
91 let time_key_serde = OrderedRowSerde::new(
92 vec![time_column_type],
93 vec![risingwave_common::util::sort_util::OrderType::ascending()],
94 );
95
96 let filled_column_index = schema.len();
98
99 Self {
100 state_table,
101 time_key_serde,
102 time_column_index,
103 filled_column_index,
104 }
105 }
106
107 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
108 self.state_table.init_epoch(epoch).await
109 }
110
111 pub fn insert(&mut self, value: impl Row) {
112 self.state_table.insert(value);
113 }
114
115 pub fn delete(&mut self, value: impl Row) {
116 self.state_table.delete(value);
117 }
118
119 pub fn batch_delete(&mut self, rows: Vec<impl Row>) {
121 for row in rows {
122 self.state_table.delete(row);
123 }
124 }
125
126 pub async fn scan_filled_rows_between(
128 &self,
129 start_time: &GapFillCacheKey,
130 end_time: &GapFillCacheKey,
131 ) -> StreamExecutorResult<Vec<(OwnedRow, OwnedRow)>> {
132 let mut filled_rows_to_delete = Vec::new();
133
134 let start_time_row = self.time_key_serde.deserialize(start_time)?;
135 let end_time_row = self.time_key_serde.deserialize(end_time)?;
136
137 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
138 Bound::Excluded(start_time_row),
139 Bound::Excluded(end_time_row),
140 );
141
142 let state_table_iter = self
143 .state_table
144 .iter_with_prefix(
145 &None::<row::Empty>,
146 sub_range,
147 PrefetchOptions::prefetch_for_large_range_scan(),
148 )
149 .await?;
150
151 pin_mut!(state_table_iter);
152 while let Some(row_result) = state_table_iter.next().await {
153 let row = row_result?.into_owned_row();
154
155 if self.extract_is_filled_flag(&row) {
156 let time_datum = row.datum_at(self.time_column_index);
159 let state_key = OwnedRow::new(vec![time_datum.to_owned_datum()]);
160 filled_rows_to_delete.push((state_key, row));
161 }
162 }
163
164 Ok(filled_rows_to_delete)
165 }
166
167 pub fn serialize_time_to_cache_key(&self, row: impl Row) -> GapFillCacheKey {
169 row.project(&[self.time_column_index])
170 .memcmp_serialize(&self.time_key_serde)
171 }
172
173 pub async fn scan_range_before(
175 &self,
176 end_time: &GapFillCacheKey,
177 limit: usize,
178 ) -> StreamExecutorResult<Vec<(GapFillCacheKey, CompactedRow, RowType)>> {
179 let end_time_row = self.time_key_serde.deserialize(end_time)?;
180
181 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
182 &(Bound::Unbounded, Bound::Excluded(end_time_row));
183
184 let state_table_iter = self
185 .state_table
186 .rev_iter_with_prefix(
187 &None::<row::Empty>,
188 sub_range,
189 PrefetchOptions::prefetch_for_large_range_scan(),
190 )
191 .await?;
192 pin_mut!(state_table_iter);
193
194 let mut results = Vec::new();
195
196 while let Some(item) = state_table_iter.next().await {
198 let state_row = item?.into_owned_row();
199 let gapfill_row = self.get_gapfill_row(state_row);
200 results.push((
201 gapfill_row.cache_key,
202 (&gapfill_row.row).into(),
203 gapfill_row.row_type,
204 ));
205 if results.len() >= limit {
206 break;
207 }
208 }
209
210 results.reverse();
211
212 Ok(results)
213 }
214
215 pub async fn scan_range_after(
217 &self,
218 start_time: &GapFillCacheKey,
219 limit: usize,
220 ) -> StreamExecutorResult<Vec<(GapFillCacheKey, CompactedRow, RowType)>> {
221 let start_time_row = self.time_key_serde.deserialize(start_time)?;
222
223 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
224 &(Bound::Excluded(start_time_row), Bound::Unbounded);
225
226 let state_table_iter = self
227 .state_table
228 .iter_with_prefix(
229 &None::<row::Empty>,
230 sub_range,
231 PrefetchOptions::prefetch_for_large_range_scan(),
232 )
233 .await?;
234 pin_mut!(state_table_iter);
235
236 let mut results = Vec::new();
237 let mut count = 0;
238
239 while let Some(item) = state_table_iter.next().await
240 && count < limit
241 {
242 let state_row = item?.into_owned_row();
243 let gapfill_row = self.get_gapfill_row(state_row);
244 results.push((
245 gapfill_row.cache_key,
246 (&gapfill_row.row).into(),
247 gapfill_row.row_type,
248 ));
249 count += 1;
250 }
251
252 Ok(results)
253 }
254
255 fn get_gapfill_row(&self, state_row: OwnedRow) -> GapFillStateRow {
257 let row_type = self.extract_is_filled(&state_row);
258 let output_row = Self::state_row_to_output_row(&state_row);
259 let cache_key = self.serialize_time_to_cache_key(&output_row);
260
261 GapFillStateRow::new(cache_key, output_row, row_type)
262 }
263
264 pub async fn flush(
265 &mut self,
266 epoch: EpochPair,
267 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
268 self.state_table.commit(epoch).await
269 }
270
271 fn row_to_state_row(row: &OwnedRow, is_filled: bool) -> OwnedRow {
273 let mut state_row_data = row.as_inner().to_vec();
274 state_row_data.push(Some(ScalarImpl::Bool(is_filled)));
275 OwnedRow::new(state_row_data)
276 }
277
278 fn state_row_to_output_row(state_row: &OwnedRow) -> OwnedRow {
280 let mut output_data = state_row.as_inner().to_vec();
281 output_data.pop();
282 OwnedRow::new(output_data)
283 }
284
285 fn extract_is_filled(&self, state_row: &OwnedRow) -> RowType {
287 let is_filled = matches!(
288 state_row
289 .datum_at(self.filled_column_index)
290 .and_then(|d| d.to_owned_datum()),
291 Some(ScalarImpl::Bool(true))
292 );
293 if is_filled {
294 RowType::Filled
295 } else {
296 RowType::Original
297 }
298 }
299
300 fn extract_is_filled_flag(&self, state_row: &OwnedRow) -> bool {
302 matches!(
303 state_row
304 .datum_at(self.filled_column_index)
305 .and_then(|d| d.to_owned_datum()),
306 Some(ScalarImpl::Bool(true))
307 )
308 }
309}
310
311pub struct GapFillCacheManager {
313 cache: GapFillCache,
314 capacity: usize,
315
316 window_bounds: Option<(GapFillCacheKey, GapFillCacheKey)>,
318}
319
320impl EstimateSize for GapFillCacheManager {
321 fn estimated_heap_size(&self) -> usize {
322 self.cache
324 .iter()
325 .map(|(key, (row, _row_type))| {
326 key.estimated_heap_size()
327 + row.estimated_heap_size()
328 + std::mem::size_of::<RowType>()
329 })
330 .sum()
331 }
332}
333
334impl GapFillCacheManager {
335 pub fn new(capacity: usize) -> Self {
336 Self {
337 cache: GapFillCache::new(),
338 capacity,
339 window_bounds: None,
340 }
341 }
342
343 pub fn contains_time(&self, cache_key: &GapFillCacheKey) -> bool {
345 matches!(&self.window_bounds, Some((earliest, latest)) if cache_key >= earliest && cache_key <= latest)
346 }
347
348 pub fn load_window(&mut self, window_data: Vec<(GapFillCacheKey, CompactedRow, RowType)>) {
350 self.cache.clear();
351 self.window_bounds = None;
352
353 if window_data.is_empty() {
354 return;
355 }
356
357 let mut earliest: Option<GapFillCacheKey> = None;
358 let mut latest: Option<GapFillCacheKey> = None;
359
360 for (cache_key, row, row_type) in window_data {
361 self.cache.insert(cache_key.clone(), (row, row_type));
362
363 if earliest.is_none() || &cache_key < earliest.as_ref().unwrap() {
364 earliest = Some(cache_key.clone());
365 }
366 if latest.is_none() || &cache_key > latest.as_ref().unwrap() {
367 latest = Some(cache_key.clone());
368 }
369 }
370
371 if let (Some(earliest), Some(latest)) = (earliest, latest) {
372 self.window_bounds = Some((earliest, latest));
373 }
374 }
375
376 pub async fn find_prev_original<S: StateStore>(
378 &mut self,
379 target_time: &GapFillCacheKey,
380 managed_state: &ManagedGapFillState<S>,
381 ) -> StreamExecutorResult<Option<(GapFillCacheKey, CompactedRow)>> {
382 if self.contains_time(target_time)
384 && let Some(result) = self.find_prev_original_in_cache(target_time)
385 {
386 return Ok(Some(result));
387 }
388
389 let mut current_search_end_time = target_time.clone();
391
392 loop {
393 let window_rows = managed_state
395 .scan_range_before(¤t_search_end_time, self.capacity)
396 .await?;
397
398 let earliest_key = match window_rows.first() {
399 None => return Ok(None),
400 Some((key, _, _)) => key.clone(),
401 };
402 self.load_window(window_rows);
403 if let Some(result) = self.find_prev_original_in_cache(target_time) {
404 return Ok(Some(result));
405 }
406 current_search_end_time = earliest_key;
407 }
408 }
409
410 pub async fn find_next_original<S: StateStore>(
412 &mut self,
413 target_time: &GapFillCacheKey,
414 managed_state: &ManagedGapFillState<S>,
415 ) -> StreamExecutorResult<Option<(GapFillCacheKey, CompactedRow)>> {
416 if self.contains_time(target_time)
418 && let Some(result) = self.find_next_original_in_cache(target_time)
419 {
420 return Ok(Some(result));
421 }
422
423 let mut current_search_start_time = target_time.clone();
425
426 loop {
427 let window_rows = managed_state
429 .scan_range_after(¤t_search_start_time, self.capacity)
430 .await?;
431
432 let latest_key = match window_rows.last() {
433 None => return Ok(None),
434 Some((key, _, _)) => key.clone(),
435 };
436 self.load_window(window_rows);
437 if let Some(result) = self.find_next_original_in_cache(target_time) {
438 return Ok(Some(result));
439 }
440 current_search_start_time = latest_key;
441 }
442 }
443
444 pub fn insert(&mut self, cache_key: GapFillCacheKey, row: CompactedRow, row_type: RowType) {
446 self.cache.insert(cache_key.clone(), (row, row_type));
447
448 self.update_window_bounds_for_insert(&cache_key);
450
451 while self.cache.len() > self.capacity {
452 if self.cache.pop_first().is_none() {
453 break;
454 }
455 }
456 self.update_window_bounds_after_removal();
457 }
458
459 pub fn remove(&mut self, cache_key: &GapFillCacheKey) -> Option<(CompactedRow, RowType)> {
460 let result = self.cache.remove(cache_key);
461 if result.is_some() {
462 self.update_window_bounds_after_removal();
463 }
464 result
465 }
466
467 pub fn sync_clean_cache_entries(
469 &mut self,
470 state_rows_to_delete: &[(OwnedRow, OwnedRow)],
471 managed_state: &ManagedGapFillState<impl StateStore>,
472 ) {
473 let mut removed_any = false;
474 for (_state_key, data_row) in state_rows_to_delete {
475 let cache_key = managed_state.serialize_time_to_cache_key(data_row);
476 if self.cache.remove(&cache_key).is_some() {
477 removed_any = true;
478 }
479 }
480 if removed_any {
481 self.update_window_bounds_after_removal();
482 }
483 }
484
485 fn find_prev_original_in_cache(
487 &self,
488 cache_key: &GapFillCacheKey,
489 ) -> Option<(GapFillCacheKey, CompactedRow)> {
490 if let Some((earliest, _)) = &self.window_bounds
493 && cache_key <= earliest
494 {
495 return None;
496 }
497
498 self.cache
499 .range::<GapFillCacheKey, _>(..cache_key)
500 .rev()
501 .find(|(_, (_, row_type))| *row_type == RowType::Original)
502 .map(|(key, (row, _))| (key.clone(), row.clone()))
503 }
504
505 fn find_next_original_in_cache(
507 &self,
508 cache_key: &GapFillCacheKey,
509 ) -> Option<(GapFillCacheKey, CompactedRow)> {
510 if let Some((_, latest)) = &self.window_bounds
513 && cache_key >= latest
514 {
515 return None;
516 }
517
518 self.cache
519 .range::<GapFillCacheKey, _>((Bound::Excluded(cache_key), Bound::Unbounded))
520 .find(|(_, (_, row_type))| *row_type == RowType::Original)
521 .map(|(key, (row, _))| (key.clone(), row.clone()))
522 }
523
524 fn update_window_bounds_for_insert(&mut self, new_key: &GapFillCacheKey) {
526 match &mut self.window_bounds {
527 None => {
528 self.window_bounds = Some((new_key.clone(), new_key.clone()));
530 }
531 Some((earliest, latest)) => {
532 if new_key < earliest {
534 *earliest = new_key.clone();
535 }
536 if new_key > latest {
537 *latest = new_key.clone();
538 }
539 }
540 }
541 }
542
543 fn update_window_bounds_after_removal(&mut self) {
545 if self.cache.is_empty() {
546 self.window_bounds = None;
547 } else {
548 let earliest = self.cache.keys().next().cloned();
550 let latest = self.cache.keys().next_back().cloned();
551 if let (Some(earliest), Some(latest)) = (earliest, latest) {
552 self.window_bounds = Some((earliest, latest));
553 }
554 }
555 }
556
557 pub fn scan_filled_rows_between_in_cache<S: StateStore>(
559 &self,
560 start_time: &GapFillCacheKey,
561 end_time: &GapFillCacheKey,
562 managed_state: &ManagedGapFillState<S>,
563 ) -> StreamExecutorResult<(Vec<(OwnedRow, OwnedRow)>, bool)> {
564 let mut filled_rows_in_cache = Vec::new();
565
566 let range_fully_in_cache = self.contains_time(start_time) && self.contains_time(end_time);
568
569 if range_fully_in_cache {
570 for (_cache_key, (compacted_row, row_type)) in self.cache.range::<GapFillCacheKey, _>((
571 std::ops::Bound::Excluded(start_time),
572 std::ops::Bound::Excluded(end_time),
573 )) {
574 if *row_type == RowType::Filled {
575 let data_types = managed_state.state_table.get_data_types();
577 let mut row_data_types = data_types.to_vec();
578 row_data_types.pop();
580
581 let row = compacted_row.deserialize(&row_data_types)?;
582 let time_datum = row.datum_at(managed_state.time_column_index);
583 let state_key = OwnedRow::new(vec![time_datum.to_owned_datum()]);
584 let state_row = ManagedGapFillState::<S>::row_to_state_row(&row, true);
586 filled_rows_in_cache.push((state_key, state_row));
587 }
588 }
589 }
590
591 Ok((filled_rows_in_cache, range_fully_in_cache))
592 }
593
594 pub async fn scan_filled_rows_between<S: StateStore>(
595 &self,
596 start_time: &GapFillCacheKey,
597 end_time: &GapFillCacheKey,
598 managed_state: &ManagedGapFillState<S>,
599 ) -> StreamExecutorResult<Vec<(OwnedRow, OwnedRow)>> {
600 let (filled_rows_cache, range_fully_in_cache) =
602 self.scan_filled_rows_between_in_cache(start_time, end_time, managed_state)?;
603
604 if range_fully_in_cache {
605 Ok(filled_rows_cache)
607 } else {
608 managed_state
610 .scan_filled_rows_between(start_time, end_time)
611 .await
612 }
613 }
614}
615
616pub struct GapFillExecutor<S: StateStore> {
617 ctx: ActorContextRef,
618 input: Executor,
619 schema: Schema,
620 chunk_size: usize,
621 time_column_index: usize,
622 fill_columns: HashMap<usize, FillStrategy>,
623 gap_interval: NonStrictExpression,
624
625 managed_state: ManagedGapFillState<S>,
627 cache_manager: GapFillCacheManager,
628
629 metrics: GapFillMetrics,
631}
632
633pub struct GapFillMetrics {
634 pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
635}
636
637impl<S: StateStore> GapFillExecutor<S> {
638 pub fn new(args: GapFillExecutorArgs<S>) -> Self {
639 let managed_state =
640 ManagedGapFillState::new(args.state_table, args.time_column_index, &args.schema);
641 let cache_manager = GapFillCacheManager::new(GAPFILL_CACHE_DEFAULT_CAPACITY);
642
643 let metrics = args.ctx.streaming_metrics.clone();
644 let actor_id = args.ctx.id.to_string();
645 let fragment_id = args.ctx.fragment_id.to_string();
646 let gap_fill_metrics = GapFillMetrics {
647 gap_fill_generated_rows_count: metrics
648 .gap_fill_generated_rows_count
649 .with_guarded_label_values(&[&actor_id, &fragment_id]),
650 };
651
652 Self {
653 ctx: args.ctx,
654 input: args.input,
655 schema: args.schema,
656 chunk_size: args.chunk_size,
657 time_column_index: args.time_column_index,
658 fill_columns: args.fill_columns,
659 gap_interval: args.gap_interval,
660 managed_state,
661 cache_manager,
662 metrics: gap_fill_metrics,
663 }
664 }
665
666 fn generate_filled_rows_between_static(
684 prev_row: &OwnedRow,
685 curr_row: &OwnedRow,
686 interval: &risingwave_common::types::Interval,
687 time_column_index: usize,
688 fill_columns: &HashMap<usize, FillStrategy>,
689 metrics: &GapFillMetrics,
690 ) -> StreamExecutorResult<Vec<OwnedRow>> {
691 let mut filled_rows = Vec::new();
692
693 let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
694 prev_row.datum_at(time_column_index),
695 curr_row.datum_at(time_column_index),
696 ) else {
697 return Ok(filled_rows);
698 };
699
700 let prev_time = match prev_time_scalar {
701 ScalarRefImpl::Timestamp(ts) => ts,
702 ScalarRefImpl::Timestamptz(ts) => {
703 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
704 Ok(timestamp) => timestamp,
705 Err(_) => {
706 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
707 return Ok(filled_rows);
708 }
709 }
710 }
711 _ => {
712 warn!("Time column is not timestamp type: {:?}", prev_time_scalar);
713 return Ok(filled_rows);
714 }
715 };
716
717 let curr_time = match curr_time_scalar {
718 ScalarRefImpl::Timestamp(ts) => ts,
719 ScalarRefImpl::Timestamptz(ts) => {
720 match risingwave_common::types::Timestamp::with_micros(ts.timestamp_micros()) {
721 Ok(timestamp) => timestamp,
722 Err(_) => {
723 warn!("Failed to convert timestamptz to timestamp: {:?}", ts);
724 return Ok(filled_rows);
725 }
726 }
727 }
728 _ => {
729 warn!("Time column is not timestamp type: {:?}", curr_time_scalar);
730 return Ok(filled_rows);
731 }
732 };
733
734 if prev_time >= curr_time {
735 return Ok(filled_rows);
736 }
737
738 let mut fill_time = match prev_time.checked_add(*interval) {
740 Some(t) => t,
741 None => {
742 warn!(
745 "Gap fill interval is too large, causing timestamp overflow. \
746 No gap filling will be performed between {:?} and {:?}.",
747 prev_time, curr_time
748 );
749 return Ok(filled_rows);
750 }
751 };
752
753 if fill_time >= curr_time {
755 return Ok(filled_rows);
756 }
757
758 let mut row_count = 0;
760 let mut temp_time = fill_time;
761 while temp_time < curr_time {
762 row_count += 1;
763 temp_time = match temp_time.checked_add(*interval) {
764 Some(t) => t,
765 None => break,
766 };
767 }
768
769 let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
771 let mut interpolation_states: Vec<Datum> = Vec::new();
772
773 for i in 0..prev_row.len() {
774 if let Some(strategy) = fill_columns.get(&i) {
775 if matches!(strategy, FillStrategy::Interpolate) {
776 let step = calculate_interpolation_step(
777 prev_row.datum_at(i),
778 curr_row.datum_at(i),
779 row_count + 1,
780 );
781 interpolation_steps.push(step.clone());
782 interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
783 } else {
784 interpolation_steps.push(None);
785 interpolation_states.push(None);
786 }
787 } else {
788 interpolation_steps.push(None);
789 interpolation_states.push(None);
790 }
791 }
792
793 while fill_time < curr_time {
795 let mut new_row_data = Vec::with_capacity(prev_row.len());
796
797 for col_idx in 0..prev_row.len() {
798 let datum = if col_idx == time_column_index {
799 let fill_time_scalar = match prev_time_scalar {
801 ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
802 ScalarRefImpl::Timestamptz(_) => {
803 let micros = fill_time.0.and_utc().timestamp_micros();
804 ScalarImpl::Timestamptz(
805 risingwave_common::types::Timestamptz::from_micros(micros),
806 )
807 }
808 _ => unreachable!("Time column should be Timestamp or Timestamptz"),
809 };
810 Some(fill_time_scalar)
811 } else if let Some(strategy) = fill_columns.get(&col_idx) {
812 match strategy {
814 FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
815 FillStrategy::Null => None,
816 FillStrategy::Interpolate => {
817 if let Some(step) = &interpolation_steps[col_idx] {
819 apply_interpolation_step(&mut interpolation_states[col_idx], step);
820 interpolation_states[col_idx].clone()
821 } else {
822 None
824 }
825 }
826 }
827 } else {
828 None
830 };
831 new_row_data.push(datum);
832 }
833
834 filled_rows.push(OwnedRow::new(new_row_data));
835
836 fill_time = match fill_time.checked_add(*interval) {
837 Some(t) => t,
838 None => {
839 warn!(
841 "Gap fill stopped due to timestamp overflow after generating {} rows.",
842 filled_rows.len()
843 );
844 break;
845 }
846 };
847 }
848
849 metrics
851 .gap_fill_generated_rows_count
852 .inc_by(filled_rows.len() as u64);
853
854 Ok(filled_rows)
855 }
856}
857
858impl<S: StateStore> Execute for GapFillExecutor<S> {
859 fn execute(self: Box<Self>) -> BoxedMessageStream {
860 self.execute_inner().boxed()
861 }
862}
863
864impl<S: StateStore> GapFillExecutor<S> {
865 #[try_stream(ok = Message, error = StreamExecutorError)]
866 async fn execute_inner(self: Box<Self>) {
867 let Self {
868 mut managed_state,
869 mut cache_manager,
870 schema,
871 chunk_size,
872 time_column_index,
873 fill_columns,
874 gap_interval,
875 ctx,
876 input,
877 metrics,
878 } = *self;
879
880 let mut input = input.execute();
881
882 let barrier = expect_first_barrier(&mut input).await?;
883 let first_epoch = barrier.epoch;
884 yield Message::Barrier(barrier);
885 managed_state.init_epoch(first_epoch).await?;
886
887 let dummy_row = OwnedRow::new(vec![]);
892 let interval_datum = gap_interval.eval_row_infallible(&dummy_row).await;
893 let interval = interval_datum
894 .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
895 .into_interval();
896
897 if interval.months() == 0 && interval.days() == 0 && interval.usecs() == 0 {
899 Err(anyhow::anyhow!("Gap interval cannot be zero"))?;
900 }
901
902 #[for_await]
903 for msg in input {
904 match msg? {
905 Message::Chunk(chunk) => {
906 let chunk = chunk.compact_vis();
907 let mut chunk_builder =
908 StreamChunkBuilder::new(chunk_size, schema.data_types());
909
910 for (op, row_ref) in chunk.rows() {
911 let row = row_ref.to_owned_row();
912
913 match op {
914 Op::Insert | Op::UpdateInsert => {
915 let cache_key = managed_state.serialize_time_to_cache_key(&row);
916
917 let prev_original = cache_manager
919 .find_prev_original(&cache_key, &managed_state)
920 .await?;
921 let next_original = cache_manager
922 .find_next_original(&cache_key, &managed_state)
923 .await?;
924
925 if let (Some((prev_key, _)), Some((next_key, _))) =
927 (&prev_original, &next_original)
928 {
929 let filled_rows_to_delete = cache_manager
930 .scan_filled_rows_between(
931 prev_key,
932 next_key,
933 &managed_state,
934 )
935 .await?;
936
937 cache_manager.sync_clean_cache_entries(
938 &filled_rows_to_delete,
939 &managed_state,
940 );
941
942 let mut state_rows_to_delete = Vec::new();
943 for (_state_key, state_row) in filled_rows_to_delete {
944 state_rows_to_delete.push(state_row.clone());
945
946 let output_row =
947 ManagedGapFillState::<S>::state_row_to_output_row(
948 &state_row,
949 );
950 if let Some(chunk) =
951 chunk_builder.append_row(Op::Delete, &output_row)
952 {
953 yield Message::Chunk(chunk);
954 }
955 }
956
957 managed_state.batch_delete(state_rows_to_delete);
958 }
959
960 let state_row =
962 ManagedGapFillState::<S>::row_to_state_row(&row, false);
963 managed_state.insert(&state_row);
964 cache_manager.insert(
965 cache_key.clone(),
966 (&row).into(),
967 RowType::Original,
968 );
969
970 if let Some(chunk) = chunk_builder.append_row(op, &row) {
971 yield Message::Chunk(chunk);
972 }
973
974 if let Some((_prev_key, prev_row_data)) = prev_original {
976 let prev_row =
977 prev_row_data.deserialize(&schema.data_types())?;
978 let filled_rows = Self::generate_filled_rows_between_static(
979 &prev_row,
980 &row,
981 &interval,
982 time_column_index,
983 &fill_columns,
984 &metrics,
985 )?;
986
987 for filled_row in filled_rows {
988 let fill_cache_key =
989 managed_state.serialize_time_to_cache_key(&filled_row);
990 let state_row = ManagedGapFillState::<S>::row_to_state_row(
991 &filled_row,
992 true,
993 );
994 managed_state.insert(&state_row);
995 cache_manager.insert(
996 fill_cache_key,
997 (&filled_row).into(),
998 RowType::Filled,
999 );
1000 if let Some(chunk) =
1001 chunk_builder.append_row(Op::Insert, &filled_row)
1002 {
1003 yield Message::Chunk(chunk);
1004 }
1005 }
1006 }
1007
1008 if let Some((_next_key, next_row_data)) = next_original {
1009 let next_row =
1010 next_row_data.deserialize(&schema.data_types())?;
1011 let filled_rows = Self::generate_filled_rows_between_static(
1012 &row,
1013 &next_row,
1014 &interval,
1015 time_column_index,
1016 &fill_columns,
1017 &metrics,
1018 )?;
1019
1020 for filled_row in filled_rows {
1021 let fill_cache_key =
1022 managed_state.serialize_time_to_cache_key(&filled_row);
1023 let state_row = ManagedGapFillState::<S>::row_to_state_row(
1024 &filled_row,
1025 true,
1026 );
1027 managed_state.insert(&state_row);
1028 cache_manager.insert(
1029 fill_cache_key,
1030 (&filled_row).into(),
1031 RowType::Filled,
1032 );
1033 if let Some(chunk) =
1034 chunk_builder.append_row(Op::Insert, &filled_row)
1035 {
1036 yield Message::Chunk(chunk);
1037 }
1038 }
1039 }
1040 }
1041 Op::Delete | Op::UpdateDelete => {
1042 let cache_key = managed_state.serialize_time_to_cache_key(&row);
1043
1044 let prev_original = cache_manager
1046 .find_prev_original(&cache_key, &managed_state)
1047 .await?;
1048 let next_original = cache_manager
1049 .find_next_original(&cache_key, &managed_state)
1050 .await?;
1051
1052 let mut filled_rows_to_delete = Vec::new();
1054
1055 if let Some((prev_key, _)) = &prev_original {
1056 let fills_left = cache_manager
1057 .scan_filled_rows_between(
1058 prev_key,
1059 &cache_key,
1060 &managed_state,
1061 )
1062 .await?;
1063 filled_rows_to_delete.extend(fills_left);
1064 }
1065
1066 if let Some((next_key, _)) = &next_original {
1067 let fills_right = cache_manager
1068 .scan_filled_rows_between(
1069 &cache_key,
1070 next_key,
1071 &managed_state,
1072 )
1073 .await?;
1074 filled_rows_to_delete.extend(fills_right);
1075 }
1076
1077 cache_manager.sync_clean_cache_entries(
1078 &filled_rows_to_delete,
1079 &managed_state,
1080 );
1081
1082 let mut state_rows_to_delete = Vec::new();
1083 for (_state_key, state_row) in filled_rows_to_delete {
1084 state_rows_to_delete.push(state_row.clone());
1085
1086 let output_row =
1087 ManagedGapFillState::<S>::state_row_to_output_row(
1088 &state_row,
1089 );
1090 if let Some(chunk) =
1091 chunk_builder.append_row(Op::Delete, &output_row)
1092 {
1093 yield Message::Chunk(chunk);
1094 }
1095 }
1096 managed_state.batch_delete(state_rows_to_delete);
1097
1098 let state_row =
1100 ManagedGapFillState::<S>::row_to_state_row(&row, false);
1101 managed_state.delete(&state_row);
1102 cache_manager.remove(&cache_key);
1103 if let Some(chunk) = chunk_builder.append_row(op, &row) {
1104 yield Message::Chunk(chunk);
1105 }
1106
1107 if let (Some((_, prev_row_data)), Some((_, next_row_data))) =
1109 (prev_original, next_original)
1110 {
1111 let prev_row =
1112 prev_row_data.deserialize(&schema.data_types())?;
1113 let next_row =
1114 next_row_data.deserialize(&schema.data_types())?;
1115 let filled_rows = Self::generate_filled_rows_between_static(
1116 &prev_row,
1117 &next_row,
1118 &interval,
1119 time_column_index,
1120 &fill_columns,
1121 &metrics,
1122 )?;
1123
1124 for filled_row in filled_rows {
1125 let fill_cache_key =
1126 managed_state.serialize_time_to_cache_key(&filled_row);
1127 let state_row = ManagedGapFillState::<S>::row_to_state_row(
1128 &filled_row,
1129 true,
1130 );
1131 managed_state.insert(&state_row);
1132 cache_manager.insert(
1133 fill_cache_key,
1134 (&filled_row).into(),
1135 RowType::Filled,
1136 );
1137 if let Some(chunk) =
1138 chunk_builder.append_row(Op::Insert, &filled_row)
1139 {
1140 yield Message::Chunk(chunk);
1141 }
1142 }
1143 }
1144 }
1145 }
1146 }
1147
1148 if let Some(chunk) = chunk_builder.take() {
1149 yield Message::Chunk(chunk);
1150 }
1151 }
1152 Message::Watermark(watermark) => {
1153 yield Message::Watermark(watermark);
1154 }
1155 Message::Barrier(barrier) => {
1156 let post_commit = managed_state.flush(barrier.epoch).await?;
1157 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(ctx.id);
1158 yield Message::Barrier(barrier);
1159 post_commit.post_yield_barrier(update_vnode_bitmap).await?;
1160 }
1161 }
1162 }
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use itertools::Itertools;
1169 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1170 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
1171 use risingwave_common::types::test_utils::IntervalTestExt;
1172 use risingwave_common::types::{DataType, Interval};
1173 use risingwave_common::util::epoch::test_epoch;
1174 use risingwave_common::util::sort_util::OrderType;
1175 use risingwave_expr::expr::LiteralExpression;
1176 use risingwave_storage::memory::MemoryStateStore;
1177
1178 use super::*;
1179 use crate::common::table::state_table::StateTable;
1180 use crate::common::table::test_utils::gen_pbtable_with_dist_key;
1181 use crate::executor::test_utils::{MessageSender, MockSource};
1182
1183 async fn create_executor(
1184 store: MemoryStateStore,
1185 fill_columns: HashMap<usize, FillStrategy>,
1186 schema: Schema,
1187 gap_interval: Interval,
1188 ) -> (MessageSender, BoxedMessageStream) {
1189 let (tx, source) = MockSource::channel();
1190 let source = source.into_executor(schema.clone(), vec![0]);
1191
1192 let mut table_columns: Vec<ColumnDesc> = schema
1193 .fields
1194 .iter()
1195 .enumerate()
1196 .map(|(i, f)| ColumnDesc::unnamed(ColumnId::new(i as i32), f.data_type.clone()))
1197 .collect();
1198
1199 let is_filled_column_id = table_columns.len() as i32;
1201 table_columns.push(ColumnDesc::unnamed(
1202 ColumnId::new(is_filled_column_id),
1203 DataType::Boolean,
1204 ));
1205
1206 let table = StateTable::from_table_catalog(
1207 &gen_pbtable_with_dist_key(
1208 TableId::new(0),
1209 table_columns,
1210 vec![OrderType::ascending()],
1211 vec![0],
1212 0,
1213 vec![],
1214 ),
1215 store,
1216 None,
1217 )
1218 .await;
1219
1220 let time_column_index = 0;
1221
1222 let executor = GapFillExecutor::new(GapFillExecutorArgs {
1223 ctx: ActorContext::for_test(123),
1224 input: source,
1225 schema: schema.clone(),
1226 chunk_size: 1024,
1227 time_column_index,
1228 fill_columns,
1229 gap_interval: NonStrictExpression::for_test(LiteralExpression::new(
1230 DataType::Interval,
1231 Some(gap_interval.into()),
1232 )),
1233 state_table: table,
1234 });
1235
1236 (tx, executor.boxed().execute())
1237 }
1238
1239 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1240 async fn test_streaming_gap_fill_locf() {
1241 let store = MemoryStateStore::new();
1242 let schema = Schema::new(vec![
1243 Field::unnamed(DataType::Timestamp),
1244 Field::unnamed(DataType::Int32),
1245 Field::unnamed(DataType::Float64),
1246 ]);
1247 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1248 let (mut tx, mut executor) =
1249 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1250
1251 tx.push_barrier(test_epoch(1), false);
1253 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1257 " TS i F
1258 + 2022-01-01T00:00:00 1 1.0
1259 + 2022-01-01T00:03:00 4 4.0",
1260 ));
1261
1262 let chunk = executor
1263 .next()
1264 .await
1265 .unwrap()
1266 .unwrap()
1267 .into_chunk()
1268 .unwrap();
1269 let expected = StreamChunk::from_pretty(
1270 " TS i F
1271 + 2022-01-01T00:00:00 1 1.0
1272 + 2022-01-01T00:03:00 4 4.0
1273 + 2022-01-01T00:01:00 1 1.0
1274 + 2022-01-01T00:02:00 1 1.0",
1275 );
1276
1277 assert_eq!(chunk.ops(), expected.ops());
1279 assert_eq!(chunk.visibility(), expected.visibility());
1280
1281 let chunk_rows: Vec<_> = chunk.rows().collect();
1283 let expected_rows: Vec<_> = expected.rows().collect();
1284 assert_eq!(chunk_rows.len(), expected_rows.len());
1285
1286 for (i, ((op1, row1), (op2, row2))) in
1287 chunk_rows.iter().zip_eq(expected_rows.iter()).enumerate()
1288 {
1289 assert_eq!(op1, op2, "Row {} operation mismatch", i);
1290 assert_eq!(
1291 row1.to_owned_row(),
1292 row2.to_owned_row(),
1293 "Row {} data mismatch",
1294 i
1295 );
1296 }
1297
1298 tx.push_chunk(StreamChunk::from_pretty(
1301 " TS i F
1302 + 2022-01-01T00:02:00 2 2.0",
1303 ));
1304
1305 let chunk2 = executor
1307 .next()
1308 .await
1309 .unwrap()
1310 .unwrap()
1311 .into_chunk()
1312 .unwrap();
1313
1314 let expected2 = StreamChunk::from_pretty(
1315 " TS i F
1316 - 2022-01-01T00:01:00 1 1.0
1317 - 2022-01-01T00:02:00 1 1.0
1318 + 2022-01-01T00:01:00 1 1.0
1319 + 2022-01-01T00:02:00 2 2.0",
1320 );
1321
1322 assert_eq!(chunk2.sort_rows(), expected2.sort_rows());
1323
1324 tx.push_chunk(StreamChunk::from_pretty(
1327 " TS i F
1328 - 2022-01-01T00:02:00 2 2.0",
1329 ));
1330
1331 let chunk3 = executor
1332 .next()
1333 .await
1334 .unwrap()
1335 .unwrap()
1336 .into_chunk()
1337 .unwrap();
1338 assert_eq!(
1339 chunk3.sort_rows(),
1340 StreamChunk::from_pretty(
1341 " TS i F
1342 - 2022-01-01T00:01:00 1 1.0
1343 - 2022-01-01T00:02:00 2 2.0
1344 + 2022-01-01T00:01:00 1 1.0
1345 + 2022-01-01T00:02:00 1 1.0"
1346 )
1347 .sort_rows()
1348 );
1349
1350 tx.push_chunk(StreamChunk::from_pretty(
1353 " TS i F
1354 U- 2022-01-01T00:03:00 4 4.0
1355 U+ 2022-01-01T00:03:00 5 5.0",
1356 ));
1357
1358 let chunk4 = executor
1359 .next()
1360 .await
1361 .unwrap()
1362 .unwrap()
1363 .into_chunk()
1364 .unwrap();
1365 assert_eq!(
1368 chunk4.sort_rows(),
1369 StreamChunk::from_pretty(
1370 " TS i F
1371 - 2022-01-01T00:01:00 1 1.0
1372 - 2022-01-01T00:02:00 1 1.0
1373 U- 2022-01-01T00:03:00 4 4.0
1374 + 2022-01-01T00:01:00 1 1.0
1375 + 2022-01-01T00:02:00 1 1.0
1376 U+ 2022-01-01T00:03:00 5 5.0"
1377 )
1378 .sort_rows()
1379 );
1380 }
1381
1382 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1383 async fn test_streaming_gap_fill_null() {
1384 let store = MemoryStateStore::new();
1385 let schema = Schema::new(vec![
1386 Field::unnamed(DataType::Timestamp),
1387 Field::unnamed(DataType::Int32),
1388 Field::unnamed(DataType::Float64),
1389 ]);
1390 let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1391 let (mut tx, mut executor) =
1392 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1393
1394 tx.push_barrier(test_epoch(1), false);
1396 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1400 " TS i F
1401 + 2022-01-01T00:00:00 1 1.0
1402 + 2022-01-01T00:03:00 4 4.0",
1403 ));
1404
1405 let chunk = executor
1406 .next()
1407 .await
1408 .unwrap()
1409 .unwrap()
1410 .into_chunk()
1411 .unwrap();
1412 assert_eq!(
1413 chunk.sort_rows(),
1414 StreamChunk::from_pretty(
1415 " TS i F
1416 + 2022-01-01T00:00:00 1 1.0
1417 + 2022-01-01T00:01:00 . .
1418 + 2022-01-01T00:02:00 . .
1419 + 2022-01-01T00:03:00 4 4.0"
1420 )
1421 .sort_rows()
1422 );
1423
1424 tx.push_chunk(StreamChunk::from_pretty(
1426 " TS i F
1427 + 2022-01-01T00:02:00 2 2.0",
1428 ));
1429
1430 let chunk2 = executor
1431 .next()
1432 .await
1433 .unwrap()
1434 .unwrap()
1435 .into_chunk()
1436 .unwrap();
1437 assert_eq!(
1438 chunk2.sort_rows(),
1439 StreamChunk::from_pretty(
1440 " TS i F
1441 - 2022-01-01T00:01:00 . .
1442 - 2022-01-01T00:02:00 . .
1443 + 2022-01-01T00:01:00 . .
1444 + 2022-01-01T00:02:00 2 2.0"
1445 )
1446 .sort_rows()
1447 );
1448
1449 tx.push_chunk(StreamChunk::from_pretty(
1451 " TS i F
1452 - 2022-01-01T00:02:00 2 2.0",
1453 ));
1454
1455 let chunk3 = executor
1456 .next()
1457 .await
1458 .unwrap()
1459 .unwrap()
1460 .into_chunk()
1461 .unwrap();
1462 assert_eq!(
1463 chunk3.sort_rows(),
1464 StreamChunk::from_pretty(
1465 " TS i F
1466 - 2022-01-01T00:01:00 . .
1467 - 2022-01-01T00:02:00 2 2.0
1468 + 2022-01-01T00:01:00 . .
1469 + 2022-01-01T00:02:00 . ."
1470 )
1471 .sort_rows()
1472 );
1473
1474 tx.push_chunk(StreamChunk::from_pretty(
1476 " TS i F
1477 U- 2022-01-01T00:03:00 4 4.0
1478 U+ 2022-01-01T00:03:00 5 5.0",
1479 ));
1480
1481 let chunk4 = executor
1482 .next()
1483 .await
1484 .unwrap()
1485 .unwrap()
1486 .into_chunk()
1487 .unwrap();
1488 assert_eq!(
1489 chunk4.sort_rows(),
1490 StreamChunk::from_pretty(
1491 " TS i F
1492 - 2022-01-01T00:01:00 . .
1493 - 2022-01-01T00:02:00 . .
1494 U- 2022-01-01T00:03:00 4 4.0
1495 + 2022-01-01T00:01:00 . .
1496 + 2022-01-01T00:02:00 . .
1497 U+ 2022-01-01T00:03:00 5 5.0"
1498 )
1499 .sort_rows()
1500 );
1501 }
1502
1503 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1504 async fn test_streaming_gap_fill_interpolate() {
1505 let store = MemoryStateStore::new();
1506 let schema = Schema::new(vec![
1507 Field::unnamed(DataType::Timestamp),
1508 Field::unnamed(DataType::Int32),
1509 Field::unnamed(DataType::Float64),
1510 ]);
1511 let fill_columns = HashMap::from([
1512 (1, FillStrategy::Interpolate),
1513 (2, FillStrategy::Interpolate),
1514 ]);
1515 let (mut tx, mut executor) =
1516 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1517
1518 tx.push_barrier(test_epoch(1), false);
1520 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1524 " TS i F
1525 + 2022-01-01T00:00:00 1 1.0
1526 + 2022-01-01T00:03:00 4 4.0",
1527 ));
1528
1529 let chunk = executor
1530 .next()
1531 .await
1532 .unwrap()
1533 .unwrap()
1534 .into_chunk()
1535 .unwrap();
1536 assert_eq!(
1537 chunk.sort_rows(),
1538 StreamChunk::from_pretty(
1539 " TS i F
1540 + 2022-01-01T00:00:00 1 1.0
1541 + 2022-01-01T00:01:00 2 2.0
1542 + 2022-01-01T00:02:00 3 3.0
1543 + 2022-01-01T00:03:00 4 4.0"
1544 )
1545 .sort_rows()
1546 );
1547
1548 tx.push_chunk(StreamChunk::from_pretty(
1550 " TS i F
1551 + 2022-01-01T00:02:00 10 10.0",
1552 ));
1553
1554 let chunk2 = executor
1555 .next()
1556 .await
1557 .unwrap()
1558 .unwrap()
1559 .into_chunk()
1560 .unwrap();
1561 assert_eq!(
1562 chunk2.sort_rows(),
1563 StreamChunk::from_pretty(
1564 " TS i F
1565 - 2022-01-01T00:01:00 2 2.0
1566 - 2022-01-01T00:02:00 3 3.0
1567 + 2022-01-01T00:01:00 5 5.5
1568 + 2022-01-01T00:02:00 10 10.0"
1569 )
1570 .sort_rows()
1571 );
1572
1573 tx.push_chunk(StreamChunk::from_pretty(
1576 " TS i F
1577 - 2022-01-01T00:02:00 10 10.0",
1578 ));
1579
1580 let chunk3 = executor
1581 .next()
1582 .await
1583 .unwrap()
1584 .unwrap()
1585 .into_chunk()
1586 .unwrap();
1587 assert_eq!(
1588 chunk3.sort_rows(),
1589 StreamChunk::from_pretty(
1590 " TS i F
1591 - 2022-01-01T00:01:00 5 5.5
1592 - 2022-01-01T00:02:00 10 10.0
1593 + 2022-01-01T00:01:00 2 2.0
1594 + 2022-01-01T00:02:00 3 3.0"
1595 )
1596 .sort_rows()
1597 );
1598
1599 tx.push_chunk(StreamChunk::from_pretty(
1602 " TS i F
1603 U- 2022-01-01T00:03:00 4 4.0
1604 U+ 2022-01-01T00:03:00 10 10.0",
1605 ));
1606
1607 let chunk4 = executor
1608 .next()
1609 .await
1610 .unwrap()
1611 .unwrap()
1612 .into_chunk()
1613 .unwrap();
1614 assert_eq!(
1615 chunk4.sort_rows(),
1616 StreamChunk::from_pretty(
1617 " TS i F
1618 - 2022-01-01T00:01:00 2 2.0
1619 - 2022-01-01T00:02:00 3 3.0
1620 U- 2022-01-01T00:03:00 4 4.0
1621 + 2022-01-01T00:01:00 4 4.0
1622 + 2022-01-01T00:02:00 7 7.0
1623 U+ 2022-01-01T00:03:00 10 10.0"
1624 )
1625 .sort_rows()
1626 );
1627 }
1628
1629 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1630 async fn test_streaming_gap_fill_recovery() {
1631 let store = MemoryStateStore::new();
1632 let schema = Schema::new(vec![
1633 Field::unnamed(DataType::Timestamp),
1634 Field::unnamed(DataType::Int32),
1635 Field::unnamed(DataType::Float64),
1636 ]);
1637 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Interpolate)]);
1638
1639 let (mut tx, mut executor) = create_executor(
1641 store.clone(),
1642 fill_columns.clone(),
1643 schema.clone(),
1644 Interval::from_minutes(1),
1645 )
1646 .await;
1647
1648 tx.push_barrier(test_epoch(1), false);
1650 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1654 " TS i F
1655 + 2022-01-01T00:00:00 1 1.0
1656 + 2022-01-01T00:03:00 4 4.0",
1657 ));
1658
1659 let chunk = executor
1661 .next()
1662 .await
1663 .unwrap()
1664 .unwrap()
1665 .into_chunk()
1666 .unwrap();
1667 assert_eq!(
1668 chunk.sort_rows(),
1669 StreamChunk::from_pretty(
1670 " TS i F
1671 + 2022-01-01T00:00:00 1 1.0
1672 + 2022-01-01T00:01:00 1 2.0
1673 + 2022-01-01T00:02:00 1 3.0
1674 + 2022-01-01T00:03:00 4 4.0"
1675 )
1676 .sort_rows()
1677 );
1678
1679 tx.push_barrier(test_epoch(2), false);
1680 executor.next().await.unwrap().unwrap(); let (mut tx2, mut executor2) = create_executor(
1684 store.clone(),
1685 fill_columns.clone(),
1686 schema.clone(),
1687 Interval::from_minutes(1),
1688 )
1689 .await;
1690
1691 tx2.push_barrier(test_epoch(2), false);
1693 executor2.next().await.unwrap().unwrap(); tx2.push_chunk(StreamChunk::from_pretty(
1699 " TS i F
1700 + 2022-01-01T00:05:00 6 10.0",
1701 ));
1702
1703 let chunk2 = executor2
1704 .next()
1705 .await
1706 .unwrap()
1707 .unwrap()
1708 .into_chunk()
1709 .unwrap();
1710 assert_eq!(
1711 chunk2.sort_rows(),
1712 StreamChunk::from_pretty(
1713 " TS i F
1714 + 2022-01-01T00:04:00 4 7.0
1715 + 2022-01-01T00:05:00 6 10.0"
1716 )
1717 .sort_rows()
1718 );
1719 }
1720
1721 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1722 async fn test_streaming_gap_fill_mixed_strategy() {
1723 let store = MemoryStateStore::new();
1724 let schema = Schema::new(vec![
1725 Field::unnamed(DataType::Timestamp),
1726 Field::unnamed(DataType::Int32),
1727 Field::unnamed(DataType::Int64),
1728 Field::unnamed(DataType::Float32),
1729 Field::unnamed(DataType::Float64),
1730 ]);
1731
1732 let fill_columns = HashMap::from([
1733 (1, FillStrategy::Interpolate),
1734 (2, FillStrategy::Locf),
1735 (3, FillStrategy::Null),
1736 (4, FillStrategy::Interpolate),
1737 ]);
1738 let gap_interval = Interval::from_days(1);
1739 let (mut tx, mut executor) =
1740 create_executor(store, fill_columns, schema, gap_interval).await;
1741
1742 tx.push_barrier(test_epoch(1), false);
1744 executor.next().await.unwrap().unwrap();
1745
1746 tx.push_chunk(StreamChunk::from_pretty(
1748 " TS i I f F
1749 + 2023-04-01T10:00:00 10 100 1.0 100.0
1750 + 2023-04-05T10:00:00 50 200 5.0 200.0",
1751 ));
1752
1753 let chunk = executor
1754 .next()
1755 .await
1756 .unwrap()
1757 .unwrap()
1758 .into_chunk()
1759 .unwrap();
1760 assert_eq!(
1761 chunk.sort_rows(),
1762 StreamChunk::from_pretty(
1763 " TS i I f F
1764 + 2023-04-01T10:00:00 10 100 1.0 100.0
1765 + 2023-04-02T10:00:00 20 100 . 125.0
1766 + 2023-04-03T10:00:00 30 100 . 150.0
1767 + 2023-04-04T10:00:00 40 100 . 175.0
1768 + 2023-04-05T10:00:00 50 200 5.0 200.0"
1769 )
1770 .sort_rows()
1771 );
1772
1773 tx.push_chunk(StreamChunk::from_pretty(
1775 " TS i I f F
1776 + 2023-04-03T10:00:00 25 150 3.0 160.0",
1777 ));
1778
1779 let chunk2 = executor
1780 .next()
1781 .await
1782 .unwrap()
1783 .unwrap()
1784 .into_chunk()
1785 .unwrap();
1786 assert_eq!(
1787 chunk2.sort_rows(),
1788 StreamChunk::from_pretty(
1789 " TS i I f F
1790 - 2023-04-02T10:00:00 20 100 . 125.0
1791 - 2023-04-03T10:00:00 30 100 . 150.0
1792 - 2023-04-04T10:00:00 40 100 . 175.0
1793 + 2023-04-02T10:00:00 17 100 . 130.0
1794 + 2023-04-03T10:00:00 25 150 3.0 160.0
1795 + 2023-04-04T10:00:00 37 150 . 180.0"
1796 )
1797 .sort_rows()
1798 );
1799
1800 tx.push_chunk(StreamChunk::from_pretty(
1802 " TS i I f F
1803 - 2023-04-03T10:00:00 25 150 3.0 160.0",
1804 ));
1805 let chunk3 = executor
1806 .next()
1807 .await
1808 .unwrap()
1809 .unwrap()
1810 .into_chunk()
1811 .unwrap();
1812 assert_eq!(
1813 chunk3.sort_rows(),
1814 StreamChunk::from_pretty(
1815 " TS i I f F
1816 - 2023-04-02T10:00:00 17 100 . 130.0
1817 - 2023-04-03T10:00:00 25 150 3.0 160.0
1818 - 2023-04-04T10:00:00 37 150 . 180.0
1819 + 2023-04-02T10:00:00 20 100 . 125.0
1820 + 2023-04-03T10:00:00 30 100 . 150.0
1821 + 2023-04-04T10:00:00 40 100 . 175.0"
1822 )
1823 .sort_rows()
1824 );
1825
1826 tx.push_chunk(StreamChunk::from_pretty(
1828 " TS i I f F
1829 U- 2023-04-05T10:00:00 50 200 5.0 200.0
1830 U+ 2023-04-05T10:00:00 50 200 5.0 300.0",
1831 ));
1832 let chunk4 = executor
1833 .next()
1834 .await
1835 .unwrap()
1836 .unwrap()
1837 .into_chunk()
1838 .unwrap();
1839 assert_eq!(
1840 chunk4.sort_rows(),
1841 StreamChunk::from_pretty(
1842 " TS i I f F
1843 - 2023-04-02T10:00:00 20 100 . 125.0
1844 - 2023-04-03T10:00:00 30 100 . 150.0
1845 - 2023-04-04T10:00:00 40 100 . 175.0
1846 U- 2023-04-05T10:00:00 50 200 5.0 200.0
1847 + 2023-04-02T10:00:00 20 100 . 150.0
1848 + 2023-04-03T10:00:00 30 100 . 200.0
1849 + 2023-04-04T10:00:00 40 100 . 250.0
1850 U+ 2023-04-05T10:00:00 50 200 5.0 300.0"
1851 )
1852 .sort_rows()
1853 );
1854 }
1855}