1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::ops::Bound;
18
19use futures::{StreamExt, pin_mut};
20use risingwave_common::array::Op;
21use risingwave_common::gap_fill::{
22 FillStrategy, apply_interpolation_step, calculate_interpolation_step,
23};
24use risingwave_common::metrics::LabelGuardedIntCounter;
25use risingwave_common::row::{OwnedRow, Row, RowExt};
26use risingwave_common::types::{
27 CheckedAdd, Datum, DefaultOrd, Interval, ScalarImpl, Timestamp, ToOwnedDatum,
28};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_storage::StateStore;
32use risingwave_storage::store::PrefetchOptions;
33use tracing::warn;
34
35use crate::common::table::state_table::{StateTable, StateTablePostCommit};
36use crate::executor::prelude::*;
37
38pub struct GapFillExecutorArgs<S: StateStore> {
39 pub ctx: ActorContextRef,
40 pub input: Executor,
41 pub schema: Schema,
42 pub chunk_size: usize,
43 pub time_column_index: usize,
44 pub fill_columns: HashMap<usize, FillStrategy>,
45 pub gap_interval: NonStrictExpression,
46 pub state_table: StateTable<S>,
47 pub partition_by_indices: Vec<usize>,
48 pub pointer_key_indices: Vec<usize>,
49 pub high_gap_fill_amplification_threshold: usize,
50}
51
52pub struct ManagedGapFillState<S: StateStore> {
57 state_table: StateTable<S>,
58 partition_by_indices: Vec<usize>,
59 pointer_key_indices: Vec<usize>,
60}
61
62impl<S: StateStore> ManagedGapFillState<S> {
63 pub fn new(
64 state_table: StateTable<S>,
65 _schema: &Schema,
66 partition_by_indices: Vec<usize>,
67 pointer_key_indices: Vec<usize>,
68 ) -> Self {
69 assert!(
70 !pointer_key_indices.is_empty(),
71 "gap fill pointer key should not be empty",
72 );
73
74 Self {
75 state_table,
76 partition_by_indices,
77 pointer_key_indices,
78 }
79 }
80
81 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
82 self.state_table.init_epoch(epoch).await
83 }
84
85 pub fn insert(&mut self, value: impl Row) {
86 self.state_table.insert(value);
87 }
88
89 pub fn delete(&mut self, value: impl Row) {
90 self.state_table.delete(value);
91 }
92
93 pub async fn flush(
94 &mut self,
95 epoch: EpochPair,
96 ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
97 self.state_table.commit(epoch).await
98 }
99
100 pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
101 self.state_table.try_flush().await
102 }
103
104 fn state_row_to_output_row(&self, state_row: impl Row) -> OwnedRow {
105 state_row.into_owned_row()
106 }
107
108 async fn find_prev_in_partition(
111 &self,
112 partition_key: impl Row,
113 target_pointer_key: impl Row,
114 ) -> StreamExecutorResult<Option<OwnedRow>> {
115 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
116 Bound::Unbounded,
117 Bound::Excluded(target_pointer_key.into_owned_row()),
118 );
119
120 let iter = self
121 .state_table
122 .rev_iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
123 .await?;
124 pin_mut!(iter);
125
126 if let Some(item) = iter.next().await {
127 let state_row = item?.into_owned_row();
128 Ok(Some(state_row))
129 } else {
130 Ok(None)
131 }
132 }
133
134 async fn find_next_in_partition(
136 &self,
137 partition_key: impl Row,
138 target_pointer_key: impl Row,
139 ) -> StreamExecutorResult<Option<OwnedRow>> {
140 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(
141 Bound::Excluded(target_pointer_key.into_owned_row()),
142 Bound::Unbounded,
143 );
144
145 let iter = self
146 .state_table
147 .iter_with_prefix(partition_key, sub_range, PrefetchOptions::default())
148 .await?;
149 pin_mut!(iter);
150
151 if let Some(item) = iter.next().await {
152 let state_row = item?.into_owned_row();
153 Ok(Some(state_row))
154 } else {
155 Ok(None)
156 }
157 }
158}
159
160pub struct GapFillExecutor<S: StateStore> {
161 ctx: ActorContextRef,
162 input: Executor,
163 schema: Schema,
164 chunk_size: usize,
165 time_column_index: usize,
166 fill_columns: HashMap<usize, FillStrategy>,
167 gap_interval: NonStrictExpression,
168 high_gap_fill_amplification_threshold: usize,
169
170 managed_state: ManagedGapFillState<S>,
172
173 metrics: GapFillMetrics,
175}
176
177pub struct GapFillMetrics {
178 pub gap_fill_generated_rows_count: LabelGuardedIntCounter,
179}
180
181struct GapFillGenerationContext<'a> {
182 metrics: &'a GapFillMetrics,
183 high_amplification_threshold: usize,
184 actor_ctx: &'a ActorContextRef,
185}
186
187fn time_scalar_to_timestamp(scalar: ScalarRefImpl<'_>) -> Option<Timestamp> {
189 match scalar {
190 ScalarRefImpl::Timestamp(ts) => Some(ts),
191 ScalarRefImpl::Timestamptz(ts) => Timestamp::with_micros(ts.timestamp_micros()).ok(),
192 _ => None,
193 }
194}
195
196impl<S: StateStore> GapFillExecutor<S> {
197 async fn find_prev_output(
198 managed_state: &ManagedGapFillState<S>,
199 partition_key: impl Row,
200 pointer_key: impl Row,
201 ) -> StreamExecutorResult<Option<OwnedRow>> {
202 Ok(managed_state
203 .find_prev_in_partition(partition_key, pointer_key)
204 .await?
205 .map(|sr| managed_state.state_row_to_output_row(sr)))
206 }
207
208 async fn find_next_output(
209 managed_state: &ManagedGapFillState<S>,
210 partition_key: impl Row,
211 pointer_key: impl Row,
212 ) -> StreamExecutorResult<Option<OwnedRow>> {
213 Ok(managed_state
214 .find_next_in_partition(partition_key, pointer_key)
215 .await?
216 .map(|sr| managed_state.state_row_to_output_row(sr)))
217 }
218
219 pub fn new(args: GapFillExecutorArgs<S>) -> Self {
220 let managed_state = ManagedGapFillState::new(
221 args.state_table,
222 &args.schema,
223 args.partition_by_indices,
224 args.pointer_key_indices,
225 );
226
227 let metrics = args.ctx.streaming_metrics.clone();
228 let actor_id = args.ctx.id.to_string();
229 let fragment_id = args.ctx.fragment_id.to_string();
230 let gap_fill_metrics = GapFillMetrics {
231 gap_fill_generated_rows_count: metrics
232 .gap_fill_generated_rows_count
233 .with_guarded_label_values(&[&actor_id, &fragment_id]),
234 };
235
236 Self {
237 ctx: args.ctx,
238 input: args.input,
239 schema: args.schema,
240 chunk_size: args.chunk_size,
241 time_column_index: args.time_column_index,
242 fill_columns: args.fill_columns,
243 gap_interval: args.gap_interval,
244 high_gap_fill_amplification_threshold: args.high_gap_fill_amplification_threshold,
245 managed_state,
246 metrics: gap_fill_metrics,
247 }
248 }
249
250 #[expect(clippy::too_many_arguments)]
268 fn generate_filled_rows_between_static(
269 prev_row: &OwnedRow,
270 curr_row: &OwnedRow,
271 interval: &risingwave_common::types::Interval,
272 time_column_index: usize,
273 partition_by_indices: &[usize],
274 fill_columns: &HashMap<usize, FillStrategy>,
275 generation_context: &GapFillGenerationContext<'_>,
276 build_from: Option<Timestamp>,
279 ) -> StreamExecutorResult<Vec<OwnedRow>> {
280 debug_assert!(
283 build_from.is_none()
284 || !fill_columns
285 .values()
286 .any(|s| matches!(s, FillStrategy::Interpolate)),
287 "build_from must not be set when any column interpolates"
288 );
289 let mut filled_rows = Vec::new();
290
291 let (Some(prev_time_scalar), Some(curr_time_scalar)) = (
292 prev_row.datum_at(time_column_index),
293 curr_row.datum_at(time_column_index),
294 ) else {
295 return Ok(filled_rows);
296 };
297
298 let Some(prev_time) = time_scalar_to_timestamp(prev_time_scalar) else {
299 warn!(
300 "Time column is not a timestamp value: {:?}",
301 prev_time_scalar
302 );
303 return Ok(filled_rows);
304 };
305 let Some(curr_time) = time_scalar_to_timestamp(curr_time_scalar) else {
306 warn!(
307 "Time column is not a timestamp value: {:?}",
308 curr_time_scalar
309 );
310 return Ok(filled_rows);
311 };
312
313 if prev_time >= curr_time {
314 return Ok(filled_rows);
315 }
316
317 let mut fill_time = match prev_time.checked_add(*interval) {
319 Some(t) => t,
320 None => {
321 warn!(
324 "Gap fill interval is too large, causing timestamp overflow. \
325 No gap filling will be performed between {:?} and {:?}.",
326 prev_time, curr_time
327 );
328 return Ok(filled_rows);
329 }
330 };
331
332 if fill_time >= curr_time {
334 return Ok(filled_rows);
335 }
336
337 let mut row_count = 0;
339 let mut temp_time = fill_time;
340 while temp_time < curr_time {
341 row_count += 1;
342 temp_time = match temp_time.checked_add(*interval) {
343 Some(t) => t,
344 None => break,
345 };
346 }
347
348 let mut interpolation_steps: Vec<Option<ScalarImpl>> = Vec::new();
350 let mut interpolation_states: Vec<Datum> = Vec::new();
351
352 for i in 0..prev_row.len() {
353 if let Some(strategy) = fill_columns.get(&i) {
354 if matches!(strategy, FillStrategy::Interpolate) {
355 let step = calculate_interpolation_step(
356 prev_row.datum_at(i),
357 curr_row.datum_at(i),
358 row_count + 1,
359 );
360 interpolation_steps.push(step.clone());
361 interpolation_states.push(prev_row.datum_at(i).to_owned_datum());
362 } else {
363 interpolation_steps.push(None);
364 interpolation_states.push(None);
365 }
366 } else {
367 interpolation_steps.push(None);
368 interpolation_states.push(None);
369 }
370 }
371
372 while fill_time < curr_time {
374 if build_from.is_some_and(|from| fill_time < from) {
375 fill_time = match fill_time.checked_add(*interval) {
376 Some(t) => t,
377 None => break,
378 };
379 continue;
380 }
381 let mut new_row_data = Vec::with_capacity(prev_row.len());
382
383 for col_idx in 0..prev_row.len() {
384 let datum = if col_idx == time_column_index {
385 let fill_time_scalar = match prev_time_scalar {
387 ScalarRefImpl::Timestamp(_) => ScalarImpl::Timestamp(fill_time),
388 ScalarRefImpl::Timestamptz(_) => {
389 let micros = fill_time.0.and_utc().timestamp_micros();
390 ScalarImpl::Timestamptz(
391 risingwave_common::types::Timestamptz::from_micros(micros),
392 )
393 }
394 _ => unreachable!("Time column should be Timestamp or Timestamptz"),
395 };
396 Some(fill_time_scalar)
397 } else if partition_by_indices.contains(&col_idx) {
398 prev_row.datum_at(col_idx).to_owned_datum()
400 } else if let Some(strategy) = fill_columns.get(&col_idx) {
401 match strategy {
403 FillStrategy::Locf => prev_row.datum_at(col_idx).to_owned_datum(),
404 FillStrategy::Null => None,
405 FillStrategy::Interpolate => {
406 if let Some(step) = &interpolation_steps[col_idx] {
408 apply_interpolation_step(&mut interpolation_states[col_idx], step);
409 interpolation_states[col_idx].clone()
410 } else {
411 None
413 }
414 }
415 }
416 } else {
417 None
422 };
423 new_row_data.push(datum);
424 }
425
426 let filled_row = OwnedRow::new(new_row_data);
427 debug_assert_ne!(
428 filled_row.datum_at(time_column_index),
429 prev_row.datum_at(time_column_index)
430 );
431 debug_assert_ne!(
432 filled_row.datum_at(time_column_index),
433 curr_row.datum_at(time_column_index)
434 );
435 filled_rows.push(filled_row);
436
437 fill_time = match fill_time.checked_add(*interval) {
438 Some(t) => t,
439 None => {
440 warn!(
442 "Gap fill stopped due to timestamp overflow after generating {} rows.",
443 filled_rows.len()
444 );
445 break;
446 }
447 };
448 }
449
450 generation_context
452 .metrics
453 .gap_fill_generated_rows_count
454 .inc_by(filled_rows.len() as u64);
455
456 if filled_rows.len() > generation_context.high_amplification_threshold {
457 let partition_key = prev_row.project(partition_by_indices);
458 tracing::warn!(target: "high_gap_fill_amplification",
459 generated_rows_len = filled_rows.len(),
460 prev_time = ?prev_time,
461 curr_time = ?curr_time,
462 gap_interval = ?interval,
463 partition_key = ?partition_key,
464 actor_id = %generation_context.actor_ctx.id,
465 fragment_id = %generation_context.actor_ctx.fragment_id,
466 "large rows generated by gap fill"
467 );
468 }
469
470 Ok(filled_rows)
471 }
472
473 fn diff_fills(
479 old_fills: Vec<OwnedRow>,
480 new_fills: Vec<OwnedRow>,
481 time_column_index: usize,
482 reuse_unchanged: bool,
483 ) -> Vec<(Op, OwnedRow)> {
484 if !reuse_unchanged {
485 return old_fills
486 .into_iter()
487 .map(|row| (Op::Delete, row))
488 .chain(new_fills.into_iter().map(|row| (Op::Insert, row)))
489 .collect();
490 }
491
492 let mut ops = Vec::new();
493 let (mut i, mut j) = (0, 0);
494 while i < old_fills.len() && j < new_fills.len() {
495 match old_fills[i]
496 .datum_at(time_column_index)
497 .default_cmp(&new_fills[j].datum_at(time_column_index))
498 {
499 Ordering::Less => {
500 ops.push((Op::Delete, old_fills[i].clone()));
501 i += 1;
502 }
503 Ordering::Greater => {
504 ops.push((Op::Insert, new_fills[j].clone()));
505 j += 1;
506 }
507 Ordering::Equal => {
508 if old_fills[i] != new_fills[j] {
510 ops.push((Op::Delete, old_fills[i].clone()));
511 ops.push((Op::Insert, new_fills[j].clone()));
512 }
513 i += 1;
514 j += 1;
515 }
516 }
517 }
518 for old_row in &old_fills[i..] {
519 ops.push((Op::Delete, old_row.clone()));
520 }
521 for new_row in &new_fills[j..] {
522 ops.push((Op::Insert, new_row.clone()));
523 }
524 ops
525 }
526}
527
528impl<S: StateStore> Execute for GapFillExecutor<S> {
529 fn execute(self: Box<Self>) -> BoxedMessageStream {
530 self.execute_inner().boxed()
531 }
532}
533
534impl<S: StateStore> GapFillExecutor<S> {
535 #[try_stream(ok = Message, error = StreamExecutorError)]
536 async fn execute_inner(self: Box<Self>) {
537 let Self {
538 mut managed_state,
539 schema,
540 chunk_size,
541 time_column_index,
542 fill_columns,
543 gap_interval,
544 high_gap_fill_amplification_threshold,
545 ctx,
546 input,
547 metrics,
548 } = *self;
549
550 let mut input = input.execute();
551
552 let barrier = expect_first_barrier(&mut input).await?;
553 let first_epoch = barrier.epoch;
554 yield Message::Barrier(barrier);
555 managed_state.init_epoch(first_epoch).await?;
556
557 let dummy_row = OwnedRow::new(vec![]);
559 let interval_datum = gap_interval.eval_row_infallible(&dummy_row).await;
560 let interval = interval_datum
561 .ok_or_else(|| anyhow::anyhow!("Gap interval expression returned null"))?
562 .into_interval();
563
564 if interval <= Interval::from_month_day_usec(0, 0, 0) {
565 Err(anyhow::anyhow!("Gap interval must be positive"))?;
566 }
567 let generation_context = GapFillGenerationContext {
568 metrics: &metrics,
569 high_amplification_threshold: high_gap_fill_amplification_threshold,
570 actor_ctx: &ctx,
571 };
572
573 let partition_by_indices = managed_state.partition_by_indices.clone();
574 let pointer_key_indices = managed_state.pointer_key_indices.clone();
575 let has_interpolate = fill_columns
578 .values()
579 .any(|strategy| matches!(strategy, FillStrategy::Interpolate));
580
581 #[for_await]
582 for msg in input {
583 match msg? {
584 Message::Chunk(chunk) => {
585 let chunk = chunk.compact_vis();
586 let mut chunk_builder =
587 StreamChunkBuilder::new(chunk_size, schema.data_types());
588
589 for (op, row_ref) in chunk.rows() {
592 let row = row_ref.to_owned_row();
593 if row.datum_at(time_column_index).is_none() {
594 if let Some(chunk) =
595 chunk_builder.append_row(op.normalize_update(), &row)
596 {
597 yield Message::Chunk(chunk);
598 }
599 continue;
600 }
601 let partition_key = (&row).project(&partition_by_indices);
602 let pointer_key = (&row).project(&pointer_key_indices);
603
604 match op {
605 Op::Insert | Op::UpdateInsert => {
606 let prev_output = Self::find_prev_output(
607 &managed_state,
608 &partition_key,
609 &pointer_key,
610 )
611 .await?;
612
613 let next_output = Self::find_next_output(
614 &managed_state,
615 &partition_key,
616 &pointer_key,
617 )
618 .await?;
619
620 let split_time = (!has_interpolate
624 && prev_output.is_some()
625 && next_output.is_some())
626 .then(|| {
627 row.datum_at(time_column_index)
628 .and_then(time_scalar_to_timestamp)
629 })
630 .flatten();
631
632 let old_fills = if let (Some(prev_out), Some(next_out)) =
633 (&prev_output, &next_output)
634 {
635 Self::generate_filled_rows_between_static(
636 prev_out,
637 next_out,
638 &interval,
639 time_column_index,
640 &managed_state.partition_by_indices,
641 &fill_columns,
642 &generation_context,
643 split_time,
644 )?
645 } else {
646 vec![]
647 };
648 let mut new_fills = vec![];
649 if split_time.is_none()
650 && let Some(prev_out) = &prev_output
651 {
652 new_fills.extend(Self::generate_filled_rows_between_static(
653 prev_out,
654 &row,
655 &interval,
656 time_column_index,
657 &managed_state.partition_by_indices,
658 &fill_columns,
659 &generation_context,
660 None,
661 )?);
662 }
663 if let Some(next_out) = &next_output {
664 new_fills.extend(Self::generate_filled_rows_between_static(
665 &row,
666 next_out,
667 &interval,
668 time_column_index,
669 &managed_state.partition_by_indices,
670 &fill_columns,
671 &generation_context,
672 None,
673 )?);
674 }
675
676 for (fill_op, filled_row) in Self::diff_fills(
679 old_fills,
680 new_fills,
681 time_column_index,
682 !has_interpolate,
683 ) {
684 if let Some(chunk) =
685 chunk_builder.append_row(fill_op, &filled_row)
686 {
687 yield Message::Chunk(chunk);
688 }
689 }
690
691 managed_state.insert(&row);
692 if let Some(chunk) =
693 chunk_builder.append_row(op.normalize_update(), &row)
694 {
695 yield Message::Chunk(chunk);
696 }
697 }
698 Op::Delete | Op::UpdateDelete => {
699 let prev_output = Self::find_prev_output(
700 &managed_state,
701 &partition_key,
702 &pointer_key,
703 )
704 .await?;
705
706 let next_output = Self::find_next_output(
707 &managed_state,
708 &partition_key,
709 &pointer_key,
710 )
711 .await?;
712
713 let split_time = (!has_interpolate
717 && prev_output.is_some()
718 && next_output.is_some())
719 .then(|| {
720 row.datum_at(time_column_index)
721 .and_then(time_scalar_to_timestamp)
722 })
723 .flatten();
724
725 let mut old_fills = vec![];
726 if split_time.is_none()
727 && let Some(prev_out) = &prev_output
728 {
729 old_fills.extend(Self::generate_filled_rows_between_static(
730 prev_out,
731 &row,
732 &interval,
733 time_column_index,
734 &managed_state.partition_by_indices,
735 &fill_columns,
736 &generation_context,
737 None,
738 )?);
739 }
740 if let Some(next_out) = &next_output {
741 old_fills.extend(Self::generate_filled_rows_between_static(
742 &row,
743 next_out,
744 &interval,
745 time_column_index,
746 &managed_state.partition_by_indices,
747 &fill_columns,
748 &generation_context,
749 None,
750 )?);
751 }
752 let new_fills = if let (Some(prev_out), Some(next_out)) =
753 (&prev_output, &next_output)
754 {
755 Self::generate_filled_rows_between_static(
756 prev_out,
757 next_out,
758 &interval,
759 time_column_index,
760 &managed_state.partition_by_indices,
761 &fill_columns,
762 &generation_context,
763 split_time,
764 )?
765 } else {
766 vec![]
767 };
768
769 managed_state.delete(&row);
770 if let Some(chunk) =
771 chunk_builder.append_row(op.normalize_update(), &row)
772 {
773 yield Message::Chunk(chunk);
774 }
775
776 for (fill_op, filled_row) in Self::diff_fills(
777 old_fills,
778 new_fills,
779 time_column_index,
780 !has_interpolate,
781 ) {
782 if let Some(chunk) =
783 chunk_builder.append_row(fill_op, &filled_row)
784 {
785 yield Message::Chunk(chunk);
786 }
787 }
788 }
789 }
790 }
791
792 if let Some(chunk) = chunk_builder.take() {
793 yield Message::Chunk(chunk);
794 }
795
796 managed_state.try_flush().await?;
797 }
798 Message::Watermark(_) => {
799 continue;
804 }
805 Message::Barrier(barrier) => {
806 let post_commit = managed_state.flush(barrier.epoch).await?;
807 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(ctx.id);
808 yield Message::Barrier(barrier);
809 let _ = post_commit.post_yield_barrier(update_vnode_bitmap).await?;
810 }
811 }
812 }
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use itertools::Itertools;
819 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
820 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
821 use risingwave_common::types::test_utils::IntervalTestExt;
822 use risingwave_common::types::{DataType, Interval, ScalarImpl, Timestamp};
823 use risingwave_common::util::epoch::test_epoch;
824 use risingwave_common::util::sort_util::OrderType;
825 use risingwave_expr::expr::LiteralExpression;
826 use risingwave_storage::memory::MemoryStateStore;
827
828 use super::*;
829 use crate::common::table::state_table::StateTable;
830 use crate::common::table::test_utils::gen_pbtable_with_dist_key;
831 use crate::executor::test_utils::{MessageSender, MockSource};
832
833 async fn create_executor(
834 store: MemoryStateStore,
835 fill_columns: HashMap<usize, FillStrategy>,
836 schema: Schema,
837 gap_interval: Interval,
838 ) -> (MessageSender, BoxedMessageStream) {
839 let (tx, source) = MockSource::channel();
840 let source = source.into_executor(schema.clone(), vec![0]);
841
842 let time_column_index = 0;
843 let partition_by_indices: Vec<usize> = vec![];
844 let pointer_key_indices = vec![0];
847
848 let table_columns: Vec<ColumnDesc> = schema
849 .fields
850 .iter()
851 .enumerate()
852 .map(|(i, f)| ColumnDesc::unnamed(ColumnId::new(i as i32), f.data_type.clone()))
853 .collect();
854
855 let table = StateTable::from_table_catalog(
858 &gen_pbtable_with_dist_key(
859 TableId::new(0),
860 table_columns,
861 vec![OrderType::ascending()],
862 vec![0],
863 0,
864 vec![],
865 ),
866 store,
867 None,
868 )
869 .await;
870
871 let executor = GapFillExecutor::new(GapFillExecutorArgs {
872 ctx: ActorContext::for_test(123),
873 input: source,
874 schema: schema.clone(),
875 chunk_size: 1024,
876 time_column_index,
877 fill_columns,
878 gap_interval: NonStrictExpression::for_test(LiteralExpression::new(
879 DataType::Interval,
880 Some(gap_interval.into()),
881 )),
882 state_table: table,
883 partition_by_indices,
884 pointer_key_indices,
885 high_gap_fill_amplification_threshold: 2048,
886 });
887
888 (tx, executor.boxed().execute())
889 }
890
891 fn test_gap_fill_metrics() -> GapFillMetrics {
892 let ctx = ActorContext::for_test(123);
893 let actor_id = ctx.id.to_string();
894 let fragment_id = ctx.fragment_id.to_string();
895
896 GapFillMetrics {
897 gap_fill_generated_rows_count: ctx
898 .streaming_metrics
899 .gap_fill_generated_rows_count
900 .with_guarded_label_values(&[&actor_id, &fragment_id]),
901 }
902 }
903
904 #[test]
905 fn test_generate_filled_rows_between_static() {
906 let anchor = |minute: &str, locf: i32| {
908 OwnedRow::new(vec![
909 Some(ScalarImpl::Int32(7)),
910 Some(ScalarImpl::Timestamp(minute.parse().unwrap())),
911 Some(ScalarImpl::Int32(locf)),
912 Some(ScalarImpl::Int32(99)),
913 ])
914 };
915 let prev_row = anchor("2023-04-01T10:00:00", 10);
916 let curr_row = anchor("2023-04-01T10:05:00", 40);
917
918 let ctx = ActorContext::for_test(123);
919 let metrics = test_gap_fill_metrics();
920 let generation_context = GapFillGenerationContext {
921 metrics: &metrics,
922 high_amplification_threshold: 2048,
923 actor_ctx: &ctx,
924 };
925 let generate = |build_from: Option<Timestamp>| {
926 GapFillExecutor::<MemoryStateStore>::generate_filled_rows_between_static(
927 &prev_row,
928 &curr_row,
929 &Interval::from_minutes(1),
930 1,
931 &[0],
932 &HashMap::from([(2, FillStrategy::Locf)]),
933 &generation_context,
934 build_from,
935 )
936 .unwrap()
937 };
938
939 let fill = |minute: &str| {
942 OwnedRow::new(vec![
943 Some(ScalarImpl::Int32(7)),
944 Some(ScalarImpl::Timestamp(minute.parse().unwrap())),
945 Some(ScalarImpl::Int32(10)),
946 None,
947 ])
948 };
949 let full = vec![
950 fill("2023-04-01T10:01:00"),
951 fill("2023-04-01T10:02:00"),
952 fill("2023-04-01T10:03:00"),
953 fill("2023-04-01T10:04:00"),
954 ];
955 assert_eq!(generate(None), full);
956
957 assert_eq!(
959 generate(Some("2023-04-01T10:03:00".parse().unwrap())),
960 full[2..]
961 );
962 }
963
964 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
965 async fn test_streaming_gap_fill_locf() {
966 let store = MemoryStateStore::new();
967 let schema = Schema::new(vec![
968 Field::unnamed(DataType::Timestamp),
969 Field::unnamed(DataType::Int32),
970 Field::unnamed(DataType::Float64),
971 ]);
972 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
973 let (mut tx, mut executor) =
974 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
975
976 tx.push_barrier(test_epoch(1), false);
978 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
982 " TS i F
983 + 2022-01-01T00:00:00 1 1.0
984 + 2022-01-01T00:03:00 4 4.0",
985 ));
986
987 let chunk = next_chunk(&mut executor).await;
988 let expected = StreamChunk::from_pretty(
989 " TS i F
990 + 2022-01-01T00:00:00 1 1.0
991 + 2022-01-01T00:01:00 1 1.0
992 + 2022-01-01T00:02:00 1 1.0
993 + 2022-01-01T00:03:00 4 4.0",
994 );
995
996 assert_eq!(chunk.ops(), expected.ops());
998 assert_eq!(chunk.visibility(), expected.visibility());
999
1000 let chunk_rows: Vec<_> = chunk.rows().collect();
1002 let expected_rows: Vec<_> = expected.rows().collect();
1003 assert_eq!(chunk_rows.len(), expected_rows.len());
1004
1005 for (i, ((op1, row1), (op2, row2))) in
1006 chunk_rows.iter().zip_eq(expected_rows.iter()).enumerate()
1007 {
1008 assert_eq!(op1, op2, "Row {} operation mismatch", i);
1009 assert_eq!(
1010 row1.to_owned_row(),
1011 row2.to_owned_row(),
1012 "Row {} data mismatch",
1013 i
1014 );
1015 }
1016
1017 tx.push_chunk(StreamChunk::from_pretty(
1020 " TS i F
1021 + 2022-01-01T00:02:00 2 2.0",
1022 ));
1023
1024 let chunk2 = next_chunk(&mut executor).await;
1026
1027 let expected2 = StreamChunk::from_pretty(
1028 " TS i F
1029 - 2022-01-01T00:02:00 1 1.0
1030 + 2022-01-01T00:02:00 2 2.0",
1031 );
1032
1033 assert_eq!(chunk2.sort_rows(), expected2.sort_rows());
1034
1035 tx.push_chunk(StreamChunk::from_pretty(
1038 " TS i F
1039 - 2022-01-01T00:02:00 2 2.0",
1040 ));
1041
1042 let chunk3 = next_chunk(&mut executor).await;
1043 assert_eq!(
1044 chunk3.sort_rows(),
1045 StreamChunk::from_pretty(
1046 " TS i F
1047 - 2022-01-01T00:02:00 2 2.0
1048 + 2022-01-01T00:02:00 1 1.0"
1049 )
1050 .sort_rows()
1051 );
1052
1053 tx.push_chunk(StreamChunk::from_pretty(
1056 " TS i F
1057 U- 2022-01-01T00:03:00 4 4.0
1058 U+ 2022-01-01T00:03:00 5 5.0",
1059 ));
1060
1061 let chunk4 = next_chunk(&mut executor).await;
1062 assert_eq!(
1065 chunk4.sort_rows(),
1066 StreamChunk::from_pretty(
1067 " TS i F
1068 - 2022-01-01T00:01:00 1 1.0
1069 - 2022-01-01T00:02:00 1 1.0
1070 - 2022-01-01T00:03:00 4 4.0
1071 + 2022-01-01T00:01:00 1 1.0
1072 + 2022-01-01T00:02:00 1 1.0
1073 + 2022-01-01T00:03:00 5 5.0"
1074 )
1075 .sort_rows()
1076 );
1077 }
1078
1079 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1080 async fn test_streaming_gap_fill_null() {
1081 let store = MemoryStateStore::new();
1082 let schema = Schema::new(vec![
1083 Field::unnamed(DataType::Timestamp),
1084 Field::unnamed(DataType::Int32),
1085 Field::unnamed(DataType::Float64),
1086 ]);
1087 let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1088 let (mut tx, mut executor) =
1089 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1090
1091 tx.push_barrier(test_epoch(1), false);
1093 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1097 " TS i F
1098 + 2022-01-01T00:00:00 1 1.0
1099 + 2022-01-01T00:03:00 4 4.0",
1100 ));
1101
1102 let chunk = next_chunk(&mut executor).await;
1103 assert_eq!(
1104 chunk.sort_rows(),
1105 StreamChunk::from_pretty(
1106 " TS i F
1107 + 2022-01-01T00:00:00 1 1.0
1108 + 2022-01-01T00:01:00 . .
1109 + 2022-01-01T00:02:00 . .
1110 + 2022-01-01T00:03:00 4 4.0"
1111 )
1112 .sort_rows()
1113 );
1114
1115 tx.push_chunk(StreamChunk::from_pretty(
1117 " TS i F
1118 + 2022-01-01T00:02:00 2 2.0",
1119 ));
1120
1121 let chunk2 = next_chunk(&mut executor).await;
1122 assert_eq!(
1123 chunk2.sort_rows(),
1124 StreamChunk::from_pretty(
1125 " TS i F
1126 - 2022-01-01T00:02:00 . .
1127 + 2022-01-01T00:02:00 2 2.0"
1128 )
1129 .sort_rows()
1130 );
1131
1132 tx.push_chunk(StreamChunk::from_pretty(
1134 " TS i F
1135 - 2022-01-01T00:02:00 2 2.0",
1136 ));
1137
1138 let chunk3 = next_chunk(&mut executor).await;
1139 assert_eq!(
1140 chunk3.sort_rows(),
1141 StreamChunk::from_pretty(
1142 " TS i F
1143 - 2022-01-01T00:02:00 2 2.0
1144 + 2022-01-01T00:02:00 . ."
1145 )
1146 .sort_rows()
1147 );
1148
1149 tx.push_chunk(StreamChunk::from_pretty(
1151 " TS i F
1152 U- 2022-01-01T00:03:00 4 4.0
1153 U+ 2022-01-01T00:03:00 5 5.0",
1154 ));
1155
1156 let chunk4 = next_chunk(&mut executor).await;
1157 assert_eq!(
1158 chunk4.sort_rows(),
1159 StreamChunk::from_pretty(
1160 " TS i F
1161 - 2022-01-01T00:01:00 . .
1162 - 2022-01-01T00:02:00 . .
1163 - 2022-01-01T00:03:00 4 4.0
1164 + 2022-01-01T00:01:00 . .
1165 + 2022-01-01T00:02:00 . .
1166 + 2022-01-01T00:03:00 5 5.0"
1167 )
1168 .sort_rows()
1169 );
1170 }
1171
1172 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1173 async fn test_streaming_gap_fill_interpolate() {
1174 let store = MemoryStateStore::new();
1175 let schema = Schema::new(vec![
1176 Field::unnamed(DataType::Timestamp),
1177 Field::unnamed(DataType::Int32),
1178 Field::unnamed(DataType::Float64),
1179 ]);
1180 let fill_columns = HashMap::from([
1181 (1, FillStrategy::Interpolate),
1182 (2, FillStrategy::Interpolate),
1183 ]);
1184 let (mut tx, mut executor) =
1185 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1186
1187 tx.push_barrier(test_epoch(1), false);
1189 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1193 " TS i F
1194 + 2022-01-01T00:00:00 1 1.0
1195 + 2022-01-01T00:03:00 4 4.0",
1196 ));
1197
1198 let chunk = next_chunk(&mut executor).await;
1199 assert_eq!(
1200 chunk.sort_rows(),
1201 StreamChunk::from_pretty(
1202 " TS i F
1203 + 2022-01-01T00:00:00 1 1.0
1204 + 2022-01-01T00:01:00 2 2.0
1205 + 2022-01-01T00:02:00 3 3.0
1206 + 2022-01-01T00:03:00 4 4.0"
1207 )
1208 .sort_rows()
1209 );
1210
1211 tx.push_chunk(StreamChunk::from_pretty(
1213 " TS i F
1214 + 2022-01-01T00:02:00 10 10.0",
1215 ));
1216
1217 let chunk2 = next_chunk(&mut executor).await;
1218 assert_eq!(
1219 chunk2.sort_rows(),
1220 StreamChunk::from_pretty(
1221 " TS i F
1222 - 2022-01-01T00:01:00 2 2.0
1223 - 2022-01-01T00:02:00 3 3.0
1224 + 2022-01-01T00:01:00 5 5.5
1225 + 2022-01-01T00:02:00 10 10.0"
1226 )
1227 .sort_rows()
1228 );
1229
1230 tx.push_chunk(StreamChunk::from_pretty(
1233 " TS i F
1234 - 2022-01-01T00:02:00 10 10.0",
1235 ));
1236
1237 let chunk3 = next_chunk(&mut executor).await;
1238 assert_eq!(
1239 chunk3.sort_rows(),
1240 StreamChunk::from_pretty(
1241 " TS i F
1242 - 2022-01-01T00:01:00 5 5.5
1243 - 2022-01-01T00:02:00 10 10.0
1244 + 2022-01-01T00:01:00 2 2.0
1245 + 2022-01-01T00:02:00 3 3.0"
1246 )
1247 .sort_rows()
1248 );
1249
1250 tx.push_chunk(StreamChunk::from_pretty(
1253 " TS i F
1254 U- 2022-01-01T00:03:00 4 4.0
1255 U+ 2022-01-01T00:03:00 10 10.0",
1256 ));
1257
1258 let chunk4 = next_chunk(&mut executor).await;
1259 assert_eq!(
1260 chunk4.sort_rows(),
1261 StreamChunk::from_pretty(
1262 " TS i F
1263 - 2022-01-01T00:01:00 2 2.0
1264 - 2022-01-01T00:02:00 3 3.0
1265 - 2022-01-01T00:03:00 4 4.0
1266 + 2022-01-01T00:01:00 4 4.0
1267 + 2022-01-01T00:02:00 7 7.0
1268 + 2022-01-01T00:03:00 10 10.0"
1269 )
1270 .sort_rows()
1271 );
1272 }
1273
1274 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1275 async fn test_streaming_gap_fill_prefers_nearest_state_neighbor_over_partial_cache() {
1276 let store = MemoryStateStore::new();
1277 let schema = Schema::new(vec![
1278 Field::unnamed(DataType::Timestamp),
1279 Field::unnamed(DataType::Int32),
1280 Field::unnamed(DataType::Float64),
1281 ]);
1282 let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1283
1284 let (mut tx, mut executor) = create_executor(
1286 store.clone(),
1287 fill_columns.clone(),
1288 schema.clone(),
1289 Interval::from_minutes(1),
1290 )
1291 .await;
1292
1293 tx.push_barrier(test_epoch(1), false);
1294 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1297 " TS i F
1298 + 2022-01-01T00:00:00 10 10.0
1299 + 2022-01-01T00:02:00 12 12.0
1300 + 2022-01-01T00:05:00 15 15.0",
1301 ));
1302 executor.next().await.unwrap().unwrap(); tx.push_barrier(test_epoch(2), false);
1305 executor.next().await.unwrap().unwrap(); let (mut tx2, mut executor2) = create_executor(
1309 store.clone(),
1310 fill_columns.clone(),
1311 schema.clone(),
1312 Interval::from_minutes(1),
1313 )
1314 .await;
1315
1316 tx2.push_barrier(test_epoch(2), false);
1317 executor2.next().await.unwrap().unwrap(); tx2.push_chunk(StreamChunk::from_pretty(
1321 " TS i F
1322 + 2022-01-01T00:08:00 20 20.0",
1323 ));
1324 executor2.next().await.unwrap().unwrap(); tx2.push_barrier(test_epoch(3), false);
1327 executor2.next().await.unwrap().unwrap(); tx2.push_chunk(StreamChunk::from_pretty(
1332 " TS i F
1333 + 2022-01-01T00:01:00 11 11.0",
1334 ));
1335
1336 let chunk = next_chunk(&mut executor2).await;
1337
1338 assert_eq!(
1339 chunk.sort_rows(),
1340 StreamChunk::from_pretty(
1341 " TS i F
1342 - 2022-01-01T00:01:00 . .
1343 + 2022-01-01T00:01:00 11 11.0"
1344 )
1345 .sort_rows()
1346 );
1347 }
1348
1349 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1350 async fn test_streaming_gap_fill_recovery() {
1351 let store = MemoryStateStore::new();
1352 let schema = Schema::new(vec![
1353 Field::unnamed(DataType::Timestamp),
1354 Field::unnamed(DataType::Int32),
1355 Field::unnamed(DataType::Float64),
1356 ]);
1357 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Interpolate)]);
1358
1359 let (mut tx, mut executor) = create_executor(
1361 store.clone(),
1362 fill_columns.clone(),
1363 schema.clone(),
1364 Interval::from_minutes(1),
1365 )
1366 .await;
1367
1368 tx.push_barrier(test_epoch(1), false);
1370 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1374 " TS i F
1375 + 2022-01-01T00:00:00 1 1.0
1376 + 2022-01-01T00:03:00 4 4.0",
1377 ));
1378
1379 let chunk = next_chunk(&mut executor).await;
1381 assert_eq!(
1382 chunk.sort_rows(),
1383 StreamChunk::from_pretty(
1384 " TS i F
1385 + 2022-01-01T00:00:00 1 1.0
1386 + 2022-01-01T00:01:00 1 2.0
1387 + 2022-01-01T00:02:00 1 3.0
1388 + 2022-01-01T00:03:00 4 4.0"
1389 )
1390 .sort_rows()
1391 );
1392
1393 tx.push_barrier(test_epoch(2), false);
1394 executor.next().await.unwrap().unwrap(); let (mut tx2, mut executor2) = create_executor(
1398 store.clone(),
1399 fill_columns.clone(),
1400 schema.clone(),
1401 Interval::from_minutes(1),
1402 )
1403 .await;
1404
1405 tx2.push_barrier(test_epoch(2), false);
1407 executor2.next().await.unwrap().unwrap(); tx2.push_chunk(StreamChunk::from_pretty(
1413 " TS i F
1414 + 2022-01-01T00:05:00 6 10.0",
1415 ));
1416
1417 let chunk2 = next_chunk(&mut executor2).await;
1418 assert_eq!(
1419 chunk2.sort_rows(),
1420 StreamChunk::from_pretty(
1421 " TS i F
1422 + 2022-01-01T00:04:00 4 7.0
1423 + 2022-01-01T00:05:00 6 10.0"
1424 )
1425 .sort_rows()
1426 );
1427 }
1428
1429 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1430 async fn test_streaming_gap_fill_mixed_strategy() {
1431 let store = MemoryStateStore::new();
1432 let schema = Schema::new(vec![
1433 Field::unnamed(DataType::Timestamp),
1434 Field::unnamed(DataType::Int32),
1435 Field::unnamed(DataType::Int64),
1436 Field::unnamed(DataType::Float32),
1437 Field::unnamed(DataType::Float64),
1438 ]);
1439
1440 let fill_columns = HashMap::from([
1441 (1, FillStrategy::Interpolate),
1442 (2, FillStrategy::Locf),
1443 (3, FillStrategy::Null),
1444 (4, FillStrategy::Interpolate),
1445 ]);
1446 let gap_interval = Interval::from_days(1);
1447 let (mut tx, mut executor) =
1448 create_executor(store, fill_columns, schema, gap_interval).await;
1449
1450 tx.push_barrier(test_epoch(1), false);
1452 executor.next().await.unwrap().unwrap();
1453
1454 tx.push_chunk(StreamChunk::from_pretty(
1456 " TS i I f F
1457 + 2023-04-01T10:00:00 10 100 1.0 100.0
1458 + 2023-04-05T10:00:00 50 200 5.0 200.0",
1459 ));
1460
1461 let chunk = next_chunk(&mut executor).await;
1462 assert_eq!(
1463 chunk.sort_rows(),
1464 StreamChunk::from_pretty(
1465 " TS i I f F
1466 + 2023-04-01T10:00:00 10 100 1.0 100.0
1467 + 2023-04-02T10:00:00 20 100 . 125.0
1468 + 2023-04-03T10:00:00 30 100 . 150.0
1469 + 2023-04-04T10:00:00 40 100 . 175.0
1470 + 2023-04-05T10:00:00 50 200 5.0 200.0"
1471 )
1472 .sort_rows()
1473 );
1474
1475 tx.push_chunk(StreamChunk::from_pretty(
1477 " TS i I f F
1478 + 2023-04-03T10:00:00 25 150 3.0 160.0",
1479 ));
1480
1481 let chunk2 = next_chunk(&mut executor).await;
1482 assert_eq!(
1483 chunk2.sort_rows(),
1484 StreamChunk::from_pretty(
1485 " TS i I f F
1486 - 2023-04-02T10:00:00 20 100 . 125.0
1487 - 2023-04-03T10:00:00 30 100 . 150.0
1488 - 2023-04-04T10:00:00 40 100 . 175.0
1489 + 2023-04-02T10:00:00 17 100 . 130.0
1490 + 2023-04-03T10:00:00 25 150 3.0 160.0
1491 + 2023-04-04T10:00:00 37 150 . 180.0"
1492 )
1493 .sort_rows()
1494 );
1495
1496 tx.push_chunk(StreamChunk::from_pretty(
1498 " TS i I f F
1499 - 2023-04-03T10:00:00 25 150 3.0 160.0",
1500 ));
1501 let chunk3 = next_chunk(&mut executor).await;
1502 assert_eq!(
1503 chunk3.sort_rows(),
1504 StreamChunk::from_pretty(
1505 " TS i I f F
1506 - 2023-04-02T10:00:00 17 100 . 130.0
1507 - 2023-04-03T10:00:00 25 150 3.0 160.0
1508 - 2023-04-04T10:00:00 37 150 . 180.0
1509 + 2023-04-02T10:00:00 20 100 . 125.0
1510 + 2023-04-03T10:00:00 30 100 . 150.0
1511 + 2023-04-04T10:00:00 40 100 . 175.0"
1512 )
1513 .sort_rows()
1514 );
1515
1516 tx.push_chunk(StreamChunk::from_pretty(
1518 " TS i I f F
1519 U- 2023-04-05T10:00:00 50 200 5.0 200.0
1520 U+ 2023-04-05T10:00:00 50 200 5.0 300.0",
1521 ));
1522 let chunk4 = next_chunk(&mut executor).await;
1523 assert_eq!(
1524 chunk4.sort_rows(),
1525 StreamChunk::from_pretty(
1526 " TS i I f F
1527 - 2023-04-02T10:00:00 20 100 . 125.0
1528 - 2023-04-03T10:00:00 30 100 . 150.0
1529 - 2023-04-04T10:00:00 40 100 . 175.0
1530 - 2023-04-05T10:00:00 50 200 5.0 200.0
1531 + 2023-04-02T10:00:00 20 100 . 150.0
1532 + 2023-04-03T10:00:00 30 100 . 200.0
1533 + 2023-04-04T10:00:00 40 100 . 250.0
1534 + 2023-04-05T10:00:00 50 200 5.0 300.0"
1535 )
1536 .sort_rows()
1537 );
1538 }
1539
1540 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1541 async fn test_streaming_gap_fill_out_of_order_keeps_unchanged_prefix() {
1542 let store = MemoryStateStore::new();
1543 let schema = Schema::new(vec![
1544 Field::unnamed(DataType::Timestamp),
1545 Field::unnamed(DataType::Int32),
1546 Field::unnamed(DataType::Float64),
1547 ]);
1548 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1549 let (mut tx, mut executor) =
1550 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1551
1552 tx.push_barrier(test_epoch(1), false);
1553 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1557 " TS i F
1558 + 2022-01-01T00:00:00 1 1.0
1559 + 2022-01-01T00:06:00 7 7.0",
1560 ));
1561 let chunk = next_chunk(&mut executor).await;
1562 assert_eq!(
1563 chunk.sort_rows(),
1564 StreamChunk::from_pretty(
1565 " TS i F
1566 + 2022-01-01T00:00:00 1 1.0
1567 + 2022-01-01T00:01:00 1 1.0
1568 + 2022-01-01T00:02:00 1 1.0
1569 + 2022-01-01T00:03:00 1 1.0
1570 + 2022-01-01T00:04:00 1 1.0
1571 + 2022-01-01T00:05:00 1 1.0
1572 + 2022-01-01T00:06:00 7 7.0"
1573 )
1574 .sort_rows()
1575 );
1576
1577 tx.push_chunk(StreamChunk::from_pretty(
1580 " TS i F
1581 + 2022-01-01T00:05:00 5 5.0",
1582 ));
1583 let chunk2 = next_chunk(&mut executor).await;
1584 assert_eq!(
1585 chunk2.sort_rows(),
1586 StreamChunk::from_pretty(
1587 " TS i F
1588 - 2022-01-01T00:05:00 1 1.0
1589 + 2022-01-01T00:05:00 5 5.0"
1590 )
1591 .sort_rows()
1592 );
1593 }
1594
1595 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1596 async fn test_streaming_gap_fill_null_out_of_order_keeps_unchanged_suffix() {
1597 let store = MemoryStateStore::new();
1598 let schema = Schema::new(vec![
1599 Field::unnamed(DataType::Timestamp),
1600 Field::unnamed(DataType::Int32),
1601 Field::unnamed(DataType::Float64),
1602 ]);
1603 let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1604 let (mut tx, mut executor) =
1605 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1606
1607 tx.push_barrier(test_epoch(1), false);
1608 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1612 " TS i F
1613 + 2022-01-01T00:00:00 1 1.0
1614 + 2022-01-01T00:06:00 7 7.0",
1615 ));
1616 let chunk = next_chunk(&mut executor).await;
1617 assert_eq!(
1618 chunk.sort_rows(),
1619 StreamChunk::from_pretty(
1620 " TS i F
1621 + 2022-01-01T00:00:00 1 1.0
1622 + 2022-01-01T00:01:00 . .
1623 + 2022-01-01T00:02:00 . .
1624 + 2022-01-01T00:03:00 . .
1625 + 2022-01-01T00:04:00 . .
1626 + 2022-01-01T00:05:00 . .
1627 + 2022-01-01T00:06:00 7 7.0"
1628 )
1629 .sort_rows()
1630 );
1631
1632 tx.push_chunk(StreamChunk::from_pretty(
1635 " TS i F
1636 + 2022-01-01T00:03:00 3 3.0",
1637 ));
1638 let chunk2 = next_chunk(&mut executor).await;
1639 assert_eq!(
1640 chunk2.sort_rows(),
1641 StreamChunk::from_pretty(
1642 " TS i F
1643 - 2022-01-01T00:03:00 . .
1644 + 2022-01-01T00:03:00 3 3.0"
1645 )
1646 .sort_rows()
1647 );
1648 }
1649
1650 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1651 async fn test_streaming_gap_fill_out_of_order_retracts_before_reinsert() {
1652 let store = MemoryStateStore::new();
1653 let schema = Schema::new(vec![
1654 Field::unnamed(DataType::Timestamp),
1655 Field::unnamed(DataType::Int32),
1656 Field::unnamed(DataType::Float64),
1657 ]);
1658 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1659 let (mut tx, mut executor) =
1660 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1661
1662 tx.push_barrier(test_epoch(1), false);
1663 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1666 " TS i F
1667 + 2022-01-01T00:00:00 1 1.0
1668 + 2022-01-01T00:04:00 4 4.0",
1669 ));
1670 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1676 " TS i F
1677 + 2022-01-01T00:02:00 2 2.0",
1678 ));
1679 assert_chunk_eq_ordered(
1680 next_chunk(&mut executor).await,
1681 StreamChunk::from_pretty(
1682 " TS i F
1683 - 2022-01-01T00:02:00 1 1.0
1684 - 2022-01-01T00:03:00 1 1.0
1685 + 2022-01-01T00:03:00 2 2.0
1686 + 2022-01-01T00:02:00 2 2.0",
1687 ),
1688 );
1689
1690 tx.push_chunk(StreamChunk::from_pretty(
1693 " TS i F
1694 - 2022-01-01T00:02:00 2 2.0",
1695 ));
1696 assert_chunk_eq_ordered(
1697 next_chunk(&mut executor).await,
1698 StreamChunk::from_pretty(
1699 " TS i F
1700 - 2022-01-01T00:02:00 2 2.0
1701 + 2022-01-01T00:02:00 1 1.0
1702 - 2022-01-01T00:03:00 2 2.0
1703 + 2022-01-01T00:03:00 1 1.0",
1704 ),
1705 );
1706 }
1707
1708 fn assert_chunk_eq_ordered(got: StreamChunk, expected: StreamChunk) {
1710 assert_eq!(got.ops(), expected.ops());
1711 let got_rows: Vec<_> = got.rows().map(|(op, r)| (op, r.to_owned_row())).collect();
1712 let want_rows: Vec<_> = expected
1713 .rows()
1714 .map(|(op, r)| (op, r.to_owned_row()))
1715 .collect();
1716 assert_eq!(got_rows, want_rows);
1717 }
1718
1719 async fn next_chunk(executor: &mut BoxedMessageStream) -> StreamChunk {
1720 executor
1721 .next()
1722 .await
1723 .unwrap()
1724 .unwrap()
1725 .into_chunk()
1726 .unwrap()
1727 }
1728
1729 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1730 async fn test_streaming_gap_fill_off_grid_out_of_order_regrids_suffix() {
1731 let store = MemoryStateStore::new();
1732 let schema = Schema::new(vec![
1733 Field::unnamed(DataType::Timestamp),
1734 Field::unnamed(DataType::Int32),
1735 Field::unnamed(DataType::Float64),
1736 ]);
1737 let fill_columns = HashMap::from([(1, FillStrategy::Locf), (2, FillStrategy::Locf)]);
1738 let (mut tx, mut executor) =
1739 create_executor(store, fill_columns, schema, Interval::from_minutes(1)).await;
1740
1741 tx.push_barrier(test_epoch(1), false);
1742 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1746 " TS i F
1747 + 2022-01-01T00:00:00 1 1.0
1748 + 2022-01-01T00:04:00 4 4.0",
1749 ));
1750 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1756 " TS i F
1757 + 2022-01-01T00:01:30 9 9.0",
1758 ));
1759 let chunk = next_chunk(&mut executor).await;
1760 assert_chunk_eq_ordered(
1761 chunk,
1762 StreamChunk::from_pretty(
1763 " TS i F
1764 - 2022-01-01T00:02:00 1 1.0
1765 + 2022-01-01T00:02:30 9 9.0
1766 - 2022-01-01T00:03:00 1 1.0
1767 + 2022-01-01T00:03:30 9 9.0
1768 + 2022-01-01T00:01:30 9 9.0",
1769 ),
1770 );
1771 }
1772
1773 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1774 async fn test_streaming_gap_fill_month_interval_partial_overlap() {
1775 let store = MemoryStateStore::new();
1776 let schema = Schema::new(vec![
1777 Field::unnamed(DataType::Timestamp),
1778 Field::unnamed(DataType::Int32),
1779 Field::unnamed(DataType::Float64),
1780 ]);
1781 let fill_columns = HashMap::from([(1, FillStrategy::Null), (2, FillStrategy::Null)]);
1782 let (mut tx, mut executor) = create_executor(
1783 store,
1784 fill_columns,
1785 schema,
1786 Interval::from_month_day_usec(1, 0, 0),
1787 )
1788 .await;
1789
1790 tx.push_barrier(test_epoch(1), false);
1791 executor.next().await.unwrap().unwrap(); tx.push_chunk(StreamChunk::from_pretty(
1795 " TS i F
1796 + 2023-12-30T00:00:00 1 1.0
1797 + 2024-03-01T00:00:00 9 9.0",
1798 ));
1799 let chunk = next_chunk(&mut executor).await;
1800 assert_eq!(
1801 chunk.sort_rows(),
1802 StreamChunk::from_pretty(
1803 " TS i F
1804 + 2023-12-30T00:00:00 1 1.0
1805 + 2024-01-30T00:00:00 . .
1806 + 2024-02-29T00:00:00 . .
1807 + 2024-03-01T00:00:00 9 9.0"
1808 )
1809 .sort_rows()
1810 );
1811
1812 tx.push_chunk(StreamChunk::from_pretty(
1815 " TS i F
1816 + 2023-12-31T00:00:00 5 5.0",
1817 ));
1818 let chunk2 = next_chunk(&mut executor).await;
1819 assert_chunk_eq_ordered(
1820 chunk2,
1821 StreamChunk::from_pretty(
1822 " TS i F
1823 - 2024-01-30T00:00:00 . .
1824 + 2024-01-31T00:00:00 . .
1825 + 2023-12-31T00:00:00 5 5.0",
1826 ),
1827 );
1828
1829 tx.push_chunk(StreamChunk::from_pretty(
1831 " TS i F
1832 - 2023-12-31T00:00:00 5 5.0",
1833 ));
1834 let chunk3 = next_chunk(&mut executor).await;
1835 assert_chunk_eq_ordered(
1836 chunk3,
1837 StreamChunk::from_pretty(
1838 " TS i F
1839 - 2023-12-31T00:00:00 5 5.0
1840 + 2024-01-30T00:00:00 . .
1841 - 2024-01-31T00:00:00 . .",
1842 ),
1843 );
1844 }
1845}