1use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57 NormalIngestion,
58 MergingData,
59 CleanUp,
60 CommitAndYieldBarrier {
61 barrier: BarrierInner<M>,
62 expect_next_state: Box<MaterializeStreamState<M>>,
63 },
64 RefreshEnd {
65 on_complete_epoch: EpochPair,
66 },
67}
68
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71 input: Executor,
72
73 schema: Schema,
74
75 state_table: StateTableInner<S, SD>,
76
77 arrange_key_indices: Vec<usize>,
79
80 actor_context: ActorContextRef,
81
82 materialize_cache: Option<MaterializeCache>,
84
85 conflict_behavior: ConflictBehavior,
86
87 version_column_indices: Vec<u32>,
88
89 may_have_downstream: bool,
90
91 subscriber_ids: HashSet<SubscriberId>,
92
93 metrics: MaterializeMetrics,
94
95 is_dummy_table: bool,
98
99 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
101
102 local_barrier_manager: LocalBarrierManager,
104}
105
106pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
108 pub table_catalog: Table,
110
111 pub staging_table_catalog: Table,
113
114 pub is_refreshing: bool,
116
117 pub staging_table: StateTableInner<S, SD>,
124
125 pub progress_table: RefreshProgressTable<S>,
127
128 pub table_id: TableId,
130}
131
132impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
133 pub async fn new(
135 store: S,
136 table_catalog: &Table,
137 staging_table_catalog: &Table,
138 progress_state_table: &Table,
139 vnodes: Option<Arc<Bitmap>>,
140 ) -> Self {
141 let table_id = table_catalog.id;
142
143 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
145 staging_table_catalog,
146 store.clone(),
147 vnodes.clone(),
148 )
149 .await;
150
151 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
152 progress_state_table,
153 store,
154 vnodes,
155 )
156 .await;
157
158 let pk_len = table_catalog.pk.len();
160 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
161
162 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
163
164 Self {
165 table_catalog: table_catalog.clone(),
166 staging_table_catalog: staging_table_catalog.clone(),
167 is_refreshing: false,
168 staging_table,
169 progress_table,
170 table_id,
171 }
172 }
173}
174
175fn get_op_consistency_level(
176 conflict_behavior: ConflictBehavior,
177 may_have_downstream: bool,
178 subscriber_ids: &HashSet<SubscriberId>,
179) -> StateTableOpConsistencyLevel {
180 if !subscriber_ids.is_empty() {
181 StateTableOpConsistencyLevel::LogStoreEnabled
182 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
183 StateTableOpConsistencyLevel::Inconsistent
186 } else {
187 StateTableOpConsistencyLevel::ConsistentOldValue
188 }
189}
190
191impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
192 #[allow(clippy::too_many_arguments)]
196 pub async fn new(
197 input: Executor,
198 schema: Schema,
199 store: S,
200 arrange_key: Vec<ColumnOrder>,
201 actor_context: ActorContextRef,
202 vnodes: Option<Arc<Bitmap>>,
203 table_catalog: &Table,
204 watermark_epoch: AtomicU64Ref,
205 conflict_behavior: ConflictBehavior,
206 version_column_indices: Vec<u32>,
207 metrics: Arc<StreamingMetrics>,
208 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
209 local_barrier_manager: LocalBarrierManager,
210 ) -> Self {
211 let table_columns: Vec<ColumnDesc> = table_catalog
212 .columns
213 .iter()
214 .map(|col| col.column_desc.as_ref().unwrap().into())
215 .collect();
216
217 let toastable_column_indices = if table_catalog.cdc_table_type()
220 == risingwave_pb::catalog::table::CdcTableType::Postgres
221 {
222 let toastable_indices: Vec<usize> = table_columns
223 .iter()
224 .enumerate()
225 .filter_map(|(index, column)| match &column.data_type {
226 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
235 Some(index)
236 }
237 _ => None,
238 })
239 .collect();
240
241 if toastable_indices.is_empty() {
242 None
243 } else {
244 Some(toastable_indices)
245 }
246 } else {
247 None
248 };
249
250 let row_serde: BasicSerde = BasicSerde::new(
251 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
252 Arc::from(table_columns.into_boxed_slice()),
253 );
254
255 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
256 let may_have_downstream = actor_context.initial_dispatch_num != 0;
257 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
258 let op_consistency_level =
259 get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
260 let state_table_metrics = metrics.new_state_table_metrics(
261 table_catalog.id,
262 actor_context.id,
263 actor_context.fragment_id,
264 );
265 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
267 .with_op_consistency_level(op_consistency_level)
268 .enable_preload_all_rows_by_config(&actor_context.config)
269 .enable_vnode_key_pruning(true)
270 .with_metrics(state_table_metrics)
271 .build()
272 .await;
273
274 let mv_metrics = metrics.new_materialize_metrics(
275 table_catalog.id,
276 actor_context.id,
277 actor_context.fragment_id,
278 );
279 let cache_metrics = metrics.new_materialize_cache_metrics(
280 table_catalog.id,
281 actor_context.id,
282 actor_context.fragment_id,
283 );
284
285 let metrics_info =
286 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
287
288 let is_dummy_table =
289 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
290
291 Self {
292 input,
293 schema,
294 state_table,
295 arrange_key_indices,
296 actor_context,
297 materialize_cache: MaterializeCache::new(
298 watermark_epoch,
299 metrics_info,
300 row_serde,
301 version_column_indices.clone(),
302 conflict_behavior,
303 toastable_column_indices,
304 cache_metrics,
305 ),
306 conflict_behavior,
307 version_column_indices,
308 is_dummy_table,
309 may_have_downstream,
310 subscriber_ids,
311 metrics: mv_metrics,
312 refresh_args,
313 local_barrier_manager,
314 }
315 }
316
317 #[try_stream(ok = Message, error = StreamExecutorError)]
318 async fn execute_inner(mut self) {
319 let mv_table_id = self.state_table.table_id();
320 let data_types = self.schema.data_types();
321 let mut input = self.input.execute();
322
323 let barrier = expect_first_barrier(&mut input).await?;
324 let first_epoch = barrier.epoch;
325 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
328 self.state_table.init_epoch(first_epoch).await?;
329
330 let mut inner_state =
332 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
333 if let Some(ref mut refresh_args) = self.refresh_args {
335 refresh_args.staging_table.init_epoch(first_epoch).await?;
336
337 refresh_args.progress_table.recover(first_epoch).await?;
339
340 let progress_stats = refresh_args.progress_table.get_progress_stats();
342 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
343 refresh_args.is_refreshing = true;
344 tracing::info!(
345 total_vnodes = progress_stats.total_vnodes,
346 completed_vnodes = progress_stats.completed_vnodes,
347 "Recovered refresh in progress, resuming refresh operation"
348 );
349
350 let incomplete_vnodes: Vec<_> = refresh_args
354 .progress_table
355 .get_all_progress()
356 .iter()
357 .filter(|(_, entry)| !entry.is_completed)
358 .map(|(&vnode, _)| vnode)
359 .collect();
360
361 if !incomplete_vnodes.is_empty() {
362 tracing::info!(
364 incomplete_vnodes = incomplete_vnodes.len(),
365 "Recovery detected incomplete VNodes, resuming refresh operation"
366 );
367 } else {
370 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
372 }
373 }
374 }
375
376 if let Some(ref refresh_args) = self.refresh_args
378 && refresh_args.is_refreshing
379 {
380 let incomplete_vnodes: Vec<_> = refresh_args
382 .progress_table
383 .get_all_progress()
384 .iter()
385 .filter(|(_, entry)| !entry.is_completed)
386 .map(|(&vnode, _)| vnode)
387 .collect();
388 if !incomplete_vnodes.is_empty() {
389 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
391 tracing::info!(
392 incomplete_vnodes = incomplete_vnodes.len(),
393 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
394 );
395 }
396 }
397
398 'main_loop: loop {
400 match *inner_state {
401 MaterializeStreamState::NormalIngestion => {
402 #[for_await]
403 '_normal_ingest: for msg in input.by_ref() {
404 let msg = msg?;
405 if let Some(cache) = &mut self.materialize_cache {
406 cache.evict();
407 }
408
409 match msg {
410 Message::Watermark(w) => {
411 yield Message::Watermark(w);
412 }
413 Message::Chunk(chunk) if self.is_dummy_table => {
414 self.metrics
415 .materialize_input_row_count
416 .inc_by(chunk.cardinality() as u64);
417 yield Message::Chunk(chunk);
418 }
419 Message::Chunk(chunk) => {
420 self.metrics
421 .materialize_input_row_count
422 .inc_by(chunk.cardinality() as u64);
423
424 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
429 self.conflict_behavior
430 && !self.state_table.is_consistent_op()
431 && self.version_column_indices.is_empty()
432 {
433 ConflictBehavior::NoCheck
434 } else {
435 self.conflict_behavior
436 };
437
438 match optimized_conflict_behavior {
439 checked_conflict_behaviors!() => {
440 if chunk.cardinality() == 0 {
441 continue;
443 }
444
445 if let Some(ref mut refresh_args) = self.refresh_args
448 && refresh_args.is_refreshing
449 {
450 let key_chunk = chunk
451 .clone()
452 .project(self.state_table.pk_indices());
453 tracing::trace!(
454 staging_chunk = %key_chunk.to_pretty(),
455 input_chunk = %chunk.to_pretty(),
456 "writing to staging table"
457 );
458 if cfg!(debug_assertions) {
459 assert!(
461 key_chunk
462 .ops()
463 .iter()
464 .all(|op| op == &Op::Insert)
465 );
466 }
467 refresh_args
468 .staging_table
469 .write_chunk(key_chunk.clone());
470 refresh_args.staging_table.try_flush().await?;
471 }
472
473 let cache = self.materialize_cache.as_mut().unwrap();
474 let change_buffer =
475 cache.handle_new(chunk, &self.state_table).await?;
476
477 match change_buffer
478 .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
479 {
480 Some(output_chunk) => {
481 self.state_table.write_chunk(output_chunk.clone());
482 self.state_table.try_flush().await?;
483 yield Message::Chunk(output_chunk);
484 }
485 None => continue,
486 }
487 }
488 ConflictBehavior::NoCheck => {
489 self.state_table.write_chunk(chunk.clone());
490 self.state_table.try_flush().await?;
491
492 if let Some(ref mut refresh_args) = self.refresh_args
494 && refresh_args.is_refreshing
495 {
496 let key_chunk = chunk
497 .clone()
498 .project(self.state_table.pk_indices());
499 tracing::trace!(
500 staging_chunk = %key_chunk.to_pretty(),
501 input_chunk = %chunk.to_pretty(),
502 "writing to staging table"
503 );
504 if cfg!(debug_assertions) {
505 assert!(
507 key_chunk
508 .ops()
509 .iter()
510 .all(|op| op == &Op::Insert)
511 );
512 }
513 refresh_args
514 .staging_table
515 .write_chunk(key_chunk.clone());
516 refresh_args.staging_table.try_flush().await?;
517 }
518
519 yield Message::Chunk(chunk);
520 }
521 }
522 }
523 Message::Barrier(barrier) => {
524 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
525 barrier,
526 expect_next_state: Box::new(
527 MaterializeStreamState::NormalIngestion,
528 ),
529 };
530 continue 'main_loop;
531 }
532 }
533 }
534
535 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
536 anyhow::anyhow!(
537 "Input stream terminated unexpectedly during normal ingestion"
538 ),
539 )));
540 }
541 MaterializeStreamState::MergingData => {
542 let Some(refresh_args) = self.refresh_args.as_mut() else {
543 panic!(
544 "MaterializeExecutor entered CleanUp state without refresh_args configured"
545 );
546 };
547 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
548
549 debug_assert_eq!(
550 self.state_table.vnodes(),
551 refresh_args.staging_table.vnodes()
552 );
553 debug_assert_eq!(
554 refresh_args.staging_table.vnodes(),
555 refresh_args.progress_table.vnodes()
556 );
557
558 let mut rows_to_delete = vec![];
559 let mut merge_complete = false;
560 let mut pending_barrier: Option<Barrier> = None;
561
562 {
564 let left_input = input.by_ref().map(Either::Left);
565 let right_merge_sort = pin!(
566 Self::make_mergesort_stream(
567 &self.state_table,
568 &refresh_args.staging_table,
569 &mut refresh_args.progress_table
570 )
571 .map(Either::Right)
572 );
573
574 let mut merge_stream =
577 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
578 stream::PollNext::Left
579 });
580
581 #[for_await]
582 'merge_stream: for either in &mut merge_stream {
583 match either {
584 Either::Left(msg) => {
585 let msg = msg?;
586 match msg {
587 Message::Watermark(w) => yield Message::Watermark(w),
588 Message::Chunk(chunk) => {
589 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
590 }
591 Message::Barrier(b) => {
592 pending_barrier = Some(b);
593 break 'merge_stream;
594 }
595 }
596 }
597 Either::Right(result) => {
598 match result? {
599 Some((_vnode, row)) => {
600 rows_to_delete.push(row);
601 }
602 None => {
603 merge_complete = true;
605
606 }
608 }
609 }
610 }
611 }
612 }
613
614 for row in &rows_to_delete {
616 self.state_table.delete(row);
617 }
618 if !rows_to_delete.is_empty() {
619 let to_delete_chunk = StreamChunk::from_rows(
620 &rows_to_delete
621 .iter()
622 .map(|row| (Op::Delete, row))
623 .collect_vec(),
624 &self.schema.data_types(),
625 );
626
627 yield Message::Chunk(to_delete_chunk);
628 }
629
630 assert!(pending_barrier.is_some(), "pending barrier is not set");
632
633 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
634 barrier: pending_barrier.unwrap(),
635 expect_next_state: if merge_complete {
636 Box::new(MaterializeStreamState::CleanUp)
637 } else {
638 Box::new(MaterializeStreamState::MergingData)
639 },
640 };
641 continue 'main_loop;
642 }
643 MaterializeStreamState::CleanUp => {
644 let Some(refresh_args) = self.refresh_args.as_mut() else {
645 panic!(
646 "MaterializeExecutor entered MergingData state without refresh_args configured"
647 );
648 };
649 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
650
651 #[for_await]
652 for msg in input.by_ref() {
653 let msg = msg?;
654 match msg {
655 Message::Watermark(w) => yield Message::Watermark(w),
656 Message::Chunk(chunk) => {
657 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
658 }
659 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
660 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
661 barrier,
662 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
663 };
664 continue 'main_loop;
665 }
666 Message::Barrier(barrier) => {
667 let staging_table_id = refresh_args.staging_table.table_id();
668 let epoch = barrier.epoch;
669 self.local_barrier_manager.report_refresh_finished(
670 epoch,
671 self.actor_context.id,
672 refresh_args.table_id,
673 staging_table_id,
674 );
675 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
676
677 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
678 barrier,
679 expect_next_state: Box::new(
680 MaterializeStreamState::RefreshEnd {
681 on_complete_epoch: epoch,
682 },
683 ),
684 };
685 continue 'main_loop;
686 }
687 }
688 }
689 }
690 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
691 let Some(refresh_args) = self.refresh_args.as_mut() else {
692 panic!(
693 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
694 );
695 };
696 let staging_table_id = refresh_args.staging_table.table_id();
697
698 let staging_store = refresh_args.staging_table.state_store().clone();
700 staging_store
701 .try_wait_epoch(
702 HummockReadEpoch::Committed(on_complete_epoch.prev),
703 TryWaitEpochOptions {
704 table_id: staging_table_id,
705 },
706 )
707 .await?;
708
709 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
710
711 if let Some(ref mut refresh_args) = self.refresh_args {
712 refresh_args.is_refreshing = false;
713 }
714 *inner_state = MaterializeStreamState::NormalIngestion;
715 continue 'main_loop;
716 }
717 MaterializeStreamState::CommitAndYieldBarrier {
718 barrier,
719 mut expect_next_state,
720 } => {
721 if let Some(ref mut refresh_args) = self.refresh_args {
722 match barrier.mutation.as_deref() {
723 Some(Mutation::RefreshStart {
724 table_id: refresh_table_id,
725 associated_source_id: _,
726 }) if *refresh_table_id == refresh_args.table_id => {
727 debug_assert!(
728 !refresh_args.is_refreshing,
729 "cannot start refresh twice"
730 );
731 refresh_args.is_refreshing = true;
732 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
733
734 Self::init_refresh_progress(
736 &self.state_table,
737 &mut refresh_args.progress_table,
738 barrier.epoch.curr,
739 )?;
740 }
741 Some(Mutation::LoadFinish {
742 associated_source_id: load_finish_source_id,
743 }) => {
744 let associated_source_id: SourceId = match refresh_args
746 .table_catalog
747 .optional_associated_source_id
748 {
749 Some(id) => id.into(),
750 None => unreachable!("associated_source_id is not set"),
751 };
752
753 if *load_finish_source_id == associated_source_id {
754 tracing::info!(
755 %load_finish_source_id,
756 "LoadFinish received, starting data replacement"
757 );
758 expect_next_state =
759 Box::new(MaterializeStreamState::<_>::MergingData);
760 }
761 }
762 _ => {}
763 }
764 }
765
766 if !self.may_have_downstream
770 && barrier.has_more_downstream_fragments(self.actor_context.id)
771 {
772 self.may_have_downstream = true;
773 }
774 Self::may_update_depended_subscriptions(
775 &mut self.subscriber_ids,
776 &barrier,
777 mv_table_id,
778 );
779 let op_consistency_level = get_op_consistency_level(
780 self.conflict_behavior,
781 self.may_have_downstream,
782 &self.subscriber_ids,
783 );
784 let post_commit = self
785 .state_table
786 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
787 .await?;
788 if !post_commit.inner().is_consistent_op() {
789 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
790 }
791
792 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
793
794 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
796 {
797 Some((
800 refresh_args.staging_table.commit(barrier.epoch).await?,
801 refresh_args.progress_table.commit(barrier.epoch).await?,
802 ))
803 } else {
804 None
805 };
806
807 let b_epoch = barrier.epoch;
808 yield Message::Barrier(barrier);
809
810 if let Some((_, cache_may_stale)) = post_commit
812 .post_yield_barrier(update_vnode_bitmap.clone())
813 .await?
814 && cache_may_stale
815 && let Some(cache) = &mut self.materialize_cache
816 {
817 cache.clear();
818 }
819
820 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
822 staging_post_commit
823 .post_yield_barrier(update_vnode_bitmap.clone())
824 .await?;
825 progress_post_commit
826 .post_yield_barrier(update_vnode_bitmap)
827 .await?;
828 }
829
830 self.metrics
831 .materialize_current_epoch
832 .set(b_epoch.curr as i64);
833
834 *inner_state = *expect_next_state;
837 }
838 }
839 }
840 }
841
842 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
846 async fn make_mergesort_stream<'a>(
847 main_table: &'a StateTableInner<S, SD>,
848 staging_table: &'a StateTableInner<S, SD>,
849 progress_table: &'a mut RefreshProgressTable<S>,
850 ) {
851 for vnode in main_table.vnodes().clone().iter_vnodes() {
852 let mut processed_rows = 0;
853 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
855 if let Some(current_entry) = progress_table.get_progress(vnode) {
856 if current_entry.is_completed {
858 tracing::debug!(
859 vnode = vnode.to_index(),
860 "Skipping already completed VNode during recovery"
861 );
862 continue;
863 }
864 processed_rows += current_entry.processed_rows;
865 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
866
867 if let Some(current_state) = ¤t_entry.current_pos {
868 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
869 } else {
870 (Bound::Unbounded, Bound::Unbounded)
871 }
872 } else {
873 (Bound::Unbounded, Bound::Unbounded)
874 };
875
876 let iter_main = main_table
877 .iter_keyed_row_with_vnode(
878 vnode,
879 &pk_range,
880 PrefetchOptions::prefetch_for_large_range_scan(),
881 )
882 .await?;
883 let iter_staging = staging_table
884 .iter_keyed_row_with_vnode(
885 vnode,
886 &pk_range,
887 PrefetchOptions::prefetch_for_large_range_scan(),
888 )
889 .await?;
890
891 pin_mut!(iter_main);
892 pin_mut!(iter_staging);
893
894 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
896 let mut staging_item: Option<KeyedRow<Bytes>> =
897 iter_staging.next().await.transpose()?;
898
899 while let Some(main_kv) = main_item {
900 let main_key = main_kv.key();
901
902 let mut should_delete = false;
904 while let Some(staging_kv) = &staging_item {
905 let staging_key = staging_kv.key();
906 match main_key.cmp(staging_key) {
907 std::cmp::Ordering::Greater => {
908 staging_item = iter_staging.next().await.transpose()?;
910 }
911 std::cmp::Ordering::Equal => {
912 break;
914 }
915 std::cmp::Ordering::Less => {
916 should_delete = true;
918 break;
919 }
920 }
921 }
922
923 if staging_item.is_none() {
925 should_delete = true;
926 }
927
928 if should_delete {
929 yield Some((vnode, main_kv.row().clone()));
930 }
931
932 processed_rows += 1;
934 tracing::debug!(
935 "set progress table: vnode = {:?}, processed_rows = {:?}",
936 vnode,
937 processed_rows
938 );
939 progress_table.set_progress(
940 vnode,
941 Some(
942 main_kv
943 .row()
944 .project(main_table.pk_indices())
945 .to_owned_row(),
946 ),
947 false,
948 processed_rows,
949 )?;
950 main_item = iter_main.next().await.transpose()?;
951 }
952
953 if let Some(current_entry) = progress_table.get_progress(vnode) {
955 progress_table.set_progress(
956 vnode,
957 current_entry.current_pos.clone(),
958 true, current_entry.processed_rows,
960 )?;
961
962 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
963 }
964 }
965
966 yield None;
968 }
969
970 fn may_update_depended_subscriptions(
972 depended_subscriptions: &mut HashSet<SubscriberId>,
973 barrier: &Barrier,
974 mv_table_id: TableId,
975 ) {
976 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
977 if !depended_subscriptions.insert(subscriber_id) {
978 warn!(
979 ?depended_subscriptions,
980 %mv_table_id,
981 %subscriber_id,
982 "subscription id already exists"
983 );
984 }
985 }
986
987 if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
988 for SubscriptionUpstreamInfo {
989 subscriber_id,
990 upstream_mv_table_id,
991 } in subscriptions_to_drop
992 {
993 if *upstream_mv_table_id == mv_table_id
994 && !depended_subscriptions.remove(subscriber_id)
995 {
996 warn!(
997 ?depended_subscriptions,
998 %mv_table_id,
999 %subscriber_id,
1000 "drop non existing subscriber_id id"
1001 );
1002 }
1003 }
1004 }
1005 }
1006
1007 fn init_refresh_progress(
1009 state_table: &StateTableInner<S, SD>,
1010 progress_table: &mut RefreshProgressTable<S>,
1011 _epoch: u64,
1012 ) -> StreamExecutorResult<()> {
1013 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1014
1015 for vnode in state_table.vnodes().iter_vnodes() {
1017 progress_table.set_progress(
1018 vnode, None, false, 0, )?;
1022 }
1023
1024 tracing::info!(
1025 vnodes_count = state_table.vnodes().count_ones(),
1026 "Initialized refresh progress tracking for all VNodes"
1027 );
1028
1029 Ok(())
1030 }
1031}
1032
1033impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1034 #[cfg(any(test, feature = "test"))]
1036 pub async fn for_test(
1037 input: Executor,
1038 store: S,
1039 table_id: TableId,
1040 keys: Vec<ColumnOrder>,
1041 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1042 watermark_epoch: AtomicU64Ref,
1043 conflict_behavior: ConflictBehavior,
1044 ) -> Self {
1045 use risingwave_common::util::iter_util::ZipEqFast;
1046
1047 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1048 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1049 let schema = input.schema().clone();
1050 let columns: Vec<ColumnDesc> = column_ids
1051 .into_iter()
1052 .zip_eq_fast(schema.fields.iter())
1053 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1054 .collect_vec();
1055
1056 let row_serde = BasicSerde::new(
1057 Arc::from((0..columns.len()).collect_vec()),
1058 Arc::from(columns.clone().into_boxed_slice()),
1059 );
1060 let state_table = StateTableInner::from_table_catalog(
1061 &crate::common::table::test_utils::gen_pbtable(
1062 table_id,
1063 columns,
1064 arrange_order_types,
1065 arrange_columns.clone(),
1066 0,
1067 ),
1068 store,
1069 None,
1070 )
1071 .await;
1072
1073 let unused = StreamingMetrics::unused();
1074 let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1075 let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1076
1077 Self {
1078 input,
1079 schema,
1080 state_table,
1081 arrange_key_indices: arrange_columns.clone(),
1082 actor_context: ActorContext::for_test(0),
1083 materialize_cache: MaterializeCache::new(
1084 watermark_epoch,
1085 MetricsInfo::for_test(),
1086 row_serde,
1087 vec![],
1088 conflict_behavior,
1089 None,
1090 cache_metrics,
1091 ),
1092 conflict_behavior,
1093 version_column_indices: vec![],
1094 is_dummy_table: false,
1095 may_have_downstream: true,
1096 subscriber_ids: HashSet::new(),
1097 metrics,
1098 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1100 }
1101 }
1102}
1103
1104impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1105 fn execute(self: Box<Self>) -> BoxedMessageStream {
1106 self.execute_inner().boxed()
1107 }
1108}
1109
1110impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1112 f.debug_struct("MaterializeExecutor")
1113 .field("arrange_key_indices", &self.arrange_key_indices)
1114 .finish()
1115 }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120
1121 use std::iter;
1122 use std::sync::atomic::AtomicU64;
1123
1124 use rand::rngs::SmallRng;
1125 use rand::{Rng, RngCore, SeedableRng};
1126 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1127 use risingwave_common::catalog::Field;
1128 use risingwave_common::util::epoch::test_epoch;
1129 use risingwave_common::util::sort_util::OrderType;
1130 use risingwave_hummock_sdk::HummockReadEpoch;
1131 use risingwave_storage::memory::MemoryStateStore;
1132 use risingwave_storage::table::batch_table::BatchTable;
1133
1134 use super::*;
1135 use crate::executor::test_utils::*;
1136
1137 #[tokio::test]
1138 async fn test_materialize_executor() {
1139 let memory_state_store = MemoryStateStore::new();
1141 let table_id = TableId::new(1);
1142 let schema = Schema::new(vec![
1144 Field::unnamed(DataType::Int32),
1145 Field::unnamed(DataType::Int32),
1146 ]);
1147 let column_ids = vec![0.into(), 1.into()];
1148
1149 let chunk1 = StreamChunk::from_pretty(
1151 " i i
1152 + 1 4
1153 + 2 5
1154 + 3 6",
1155 );
1156 let chunk2 = StreamChunk::from_pretty(
1157 " i i
1158 + 7 8
1159 - 3 6",
1160 );
1161
1162 let source = MockSource::with_messages(vec![
1164 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1165 Message::Chunk(chunk1),
1166 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1167 Message::Chunk(chunk2),
1168 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1169 ])
1170 .into_executor(schema.clone(), StreamKey::new());
1171
1172 let order_types = vec![OrderType::ascending()];
1173 let column_descs = vec![
1174 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1175 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1176 ];
1177
1178 let table = BatchTable::for_test(
1179 memory_state_store.clone(),
1180 table_id,
1181 column_descs,
1182 order_types,
1183 vec![0],
1184 vec![0, 1],
1185 );
1186
1187 let mut materialize_executor = MaterializeExecutor::for_test(
1188 source,
1189 memory_state_store,
1190 table_id,
1191 vec![ColumnOrder::new(0, OrderType::ascending())],
1192 column_ids,
1193 Arc::new(AtomicU64::new(0)),
1194 ConflictBehavior::NoCheck,
1195 )
1196 .await
1197 .boxed()
1198 .execute();
1199 materialize_executor.next().await.transpose().unwrap();
1200
1201 materialize_executor.next().await.transpose().unwrap();
1202
1203 match materialize_executor.next().await.transpose().unwrap() {
1205 Some(Message::Barrier(_)) => {
1206 let row = table
1207 .get_row(
1208 &OwnedRow::new(vec![Some(3_i32.into())]),
1209 HummockReadEpoch::NoWait(u64::MAX),
1210 )
1211 .await
1212 .unwrap();
1213 assert_eq!(
1214 row,
1215 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1216 );
1217 }
1218 _ => unreachable!(),
1219 }
1220 materialize_executor.next().await.transpose().unwrap();
1221 match materialize_executor.next().await.transpose().unwrap() {
1223 Some(Message::Barrier(_)) => {
1224 let row = table
1225 .get_row(
1226 &OwnedRow::new(vec![Some(7_i32.into())]),
1227 HummockReadEpoch::NoWait(u64::MAX),
1228 )
1229 .await
1230 .unwrap();
1231 assert_eq!(
1232 row,
1233 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1234 );
1235 }
1236 _ => unreachable!(),
1237 }
1238 }
1239
1240 #[tokio::test]
1242 async fn test_upsert_stream() {
1243 let memory_state_store = MemoryStateStore::new();
1245 let table_id = TableId::new(1);
1246 let schema = Schema::new(vec![
1248 Field::unnamed(DataType::Int32),
1249 Field::unnamed(DataType::Int32),
1250 ]);
1251 let column_ids = vec![0.into(), 1.into()];
1252
1253 let chunk1 = StreamChunk::from_pretty(
1255 " i i
1256 + 1 1",
1257 );
1258
1259 let chunk2 = StreamChunk::from_pretty(
1260 " i i
1261 + 1 2
1262 - 1 2",
1263 );
1264
1265 let source = MockSource::with_messages(vec![
1267 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1268 Message::Chunk(chunk1),
1269 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1270 Message::Chunk(chunk2),
1271 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1272 ])
1273 .into_executor(schema.clone(), StreamKey::new());
1274
1275 let order_types = vec![OrderType::ascending()];
1276 let column_descs = vec![
1277 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1278 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1279 ];
1280
1281 let table = BatchTable::for_test(
1282 memory_state_store.clone(),
1283 table_id,
1284 column_descs,
1285 order_types,
1286 vec![0],
1287 vec![0, 1],
1288 );
1289
1290 let mut materialize_executor = MaterializeExecutor::for_test(
1291 source,
1292 memory_state_store,
1293 table_id,
1294 vec![ColumnOrder::new(0, OrderType::ascending())],
1295 column_ids,
1296 Arc::new(AtomicU64::new(0)),
1297 ConflictBehavior::Overwrite,
1298 )
1299 .await
1300 .boxed()
1301 .execute();
1302 materialize_executor.next().await.transpose().unwrap();
1303
1304 materialize_executor.next().await.transpose().unwrap();
1305 materialize_executor.next().await.transpose().unwrap();
1306 materialize_executor.next().await.transpose().unwrap();
1307
1308 match materialize_executor.next().await.transpose().unwrap() {
1309 Some(Message::Barrier(_)) => {
1310 let row = table
1311 .get_row(
1312 &OwnedRow::new(vec![Some(1_i32.into())]),
1313 HummockReadEpoch::NoWait(u64::MAX),
1314 )
1315 .await
1316 .unwrap();
1317 assert!(row.is_none());
1318 }
1319 _ => unreachable!(),
1320 }
1321 }
1322
1323 #[tokio::test]
1324 async fn test_check_insert_conflict() {
1325 let memory_state_store = MemoryStateStore::new();
1327 let table_id = TableId::new(1);
1328 let schema = Schema::new(vec![
1330 Field::unnamed(DataType::Int32),
1331 Field::unnamed(DataType::Int32),
1332 ]);
1333 let column_ids = vec![0.into(), 1.into()];
1334
1335 let chunk1 = StreamChunk::from_pretty(
1337 " i i
1338 + 1 3
1339 + 1 4
1340 + 2 5
1341 + 3 6",
1342 );
1343
1344 let chunk2 = StreamChunk::from_pretty(
1345 " i i
1346 + 1 3
1347 + 2 6",
1348 );
1349
1350 let chunk3 = StreamChunk::from_pretty(
1352 " i i
1353 + 1 4",
1354 );
1355
1356 let source = MockSource::with_messages(vec![
1358 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1359 Message::Chunk(chunk1),
1360 Message::Chunk(chunk2),
1361 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1362 Message::Chunk(chunk3),
1363 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1364 ])
1365 .into_executor(schema.clone(), StreamKey::new());
1366
1367 let order_types = vec![OrderType::ascending()];
1368 let column_descs = vec![
1369 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1370 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1371 ];
1372
1373 let table = BatchTable::for_test(
1374 memory_state_store.clone(),
1375 table_id,
1376 column_descs,
1377 order_types,
1378 vec![0],
1379 vec![0, 1],
1380 );
1381
1382 let mut materialize_executor = MaterializeExecutor::for_test(
1383 source,
1384 memory_state_store,
1385 table_id,
1386 vec![ColumnOrder::new(0, OrderType::ascending())],
1387 column_ids,
1388 Arc::new(AtomicU64::new(0)),
1389 ConflictBehavior::Overwrite,
1390 )
1391 .await
1392 .boxed()
1393 .execute();
1394 materialize_executor.next().await.transpose().unwrap();
1395
1396 materialize_executor.next().await.transpose().unwrap();
1397 materialize_executor.next().await.transpose().unwrap();
1398
1399 match materialize_executor.next().await.transpose().unwrap() {
1401 Some(Message::Barrier(_)) => {
1402 let row = table
1403 .get_row(
1404 &OwnedRow::new(vec![Some(3_i32.into())]),
1405 HummockReadEpoch::NoWait(u64::MAX),
1406 )
1407 .await
1408 .unwrap();
1409 assert_eq!(
1410 row,
1411 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1412 );
1413
1414 let row = table
1415 .get_row(
1416 &OwnedRow::new(vec![Some(1_i32.into())]),
1417 HummockReadEpoch::NoWait(u64::MAX),
1418 )
1419 .await
1420 .unwrap();
1421 assert_eq!(
1422 row,
1423 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1424 );
1425
1426 let row = table
1427 .get_row(
1428 &OwnedRow::new(vec![Some(2_i32.into())]),
1429 HummockReadEpoch::NoWait(u64::MAX),
1430 )
1431 .await
1432 .unwrap();
1433 assert_eq!(
1434 row,
1435 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1436 );
1437 }
1438 _ => unreachable!(),
1439 }
1440 }
1441
1442 #[tokio::test]
1443 async fn test_delete_and_update_conflict() {
1444 let memory_state_store = MemoryStateStore::new();
1446 let table_id = TableId::new(1);
1447 let schema = Schema::new(vec![
1449 Field::unnamed(DataType::Int32),
1450 Field::unnamed(DataType::Int32),
1451 ]);
1452 let column_ids = vec![0.into(), 1.into()];
1453
1454 let chunk1 = StreamChunk::from_pretty(
1456 " i i
1457 + 1 4
1458 + 2 5
1459 + 3 6
1460 U- 8 1
1461 U+ 8 2
1462 + 8 3",
1463 );
1464
1465 let chunk2 = StreamChunk::from_pretty(
1467 " i i
1468 + 7 8
1469 - 3 4
1470 - 5 0",
1471 );
1472
1473 let chunk3 = StreamChunk::from_pretty(
1475 " i i
1476 + 1 5
1477 U- 2 4
1478 U+ 2 8
1479 U- 9 0
1480 U+ 9 1",
1481 );
1482
1483 let source = MockSource::with_messages(vec![
1485 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1486 Message::Chunk(chunk1),
1487 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1488 Message::Chunk(chunk2),
1489 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1490 Message::Chunk(chunk3),
1491 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1492 ])
1493 .into_executor(schema.clone(), StreamKey::new());
1494
1495 let order_types = vec![OrderType::ascending()];
1496 let column_descs = vec![
1497 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1498 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1499 ];
1500
1501 let table = BatchTable::for_test(
1502 memory_state_store.clone(),
1503 table_id,
1504 column_descs,
1505 order_types,
1506 vec![0],
1507 vec![0, 1],
1508 );
1509
1510 let mut materialize_executor = MaterializeExecutor::for_test(
1511 source,
1512 memory_state_store,
1513 table_id,
1514 vec![ColumnOrder::new(0, OrderType::ascending())],
1515 column_ids,
1516 Arc::new(AtomicU64::new(0)),
1517 ConflictBehavior::Overwrite,
1518 )
1519 .await
1520 .boxed()
1521 .execute();
1522 materialize_executor.next().await.transpose().unwrap();
1523
1524 materialize_executor.next().await.transpose().unwrap();
1525
1526 match materialize_executor.next().await.transpose().unwrap() {
1528 Some(Message::Barrier(_)) => {
1529 let row = table
1531 .get_row(
1532 &OwnedRow::new(vec![Some(8_i32.into())]),
1533 HummockReadEpoch::NoWait(u64::MAX),
1534 )
1535 .await
1536 .unwrap();
1537 assert_eq!(
1538 row,
1539 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1540 );
1541 }
1542 _ => unreachable!(),
1543 }
1544 materialize_executor.next().await.transpose().unwrap();
1545
1546 match materialize_executor.next().await.transpose().unwrap() {
1547 Some(Message::Barrier(_)) => {
1548 let row = table
1549 .get_row(
1550 &OwnedRow::new(vec![Some(7_i32.into())]),
1551 HummockReadEpoch::NoWait(u64::MAX),
1552 )
1553 .await
1554 .unwrap();
1555 assert_eq!(
1556 row,
1557 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1558 );
1559
1560 let row = table
1562 .get_row(
1563 &OwnedRow::new(vec![Some(3_i32.into())]),
1564 HummockReadEpoch::NoWait(u64::MAX),
1565 )
1566 .await
1567 .unwrap();
1568 assert_eq!(row, None);
1569
1570 let row = table
1572 .get_row(
1573 &OwnedRow::new(vec![Some(5_i32.into())]),
1574 HummockReadEpoch::NoWait(u64::MAX),
1575 )
1576 .await
1577 .unwrap();
1578 assert_eq!(row, None);
1579 }
1580 _ => unreachable!(),
1581 }
1582
1583 materialize_executor.next().await.transpose().unwrap();
1584 match materialize_executor.next().await.transpose().unwrap() {
1586 Some(Message::Barrier(_)) => {
1587 let row = table
1588 .get_row(
1589 &OwnedRow::new(vec![Some(1_i32.into())]),
1590 HummockReadEpoch::NoWait(u64::MAX),
1591 )
1592 .await
1593 .unwrap();
1594 assert_eq!(
1595 row,
1596 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1597 );
1598
1599 let row = table
1601 .get_row(
1602 &OwnedRow::new(vec![Some(2_i32.into())]),
1603 HummockReadEpoch::NoWait(u64::MAX),
1604 )
1605 .await
1606 .unwrap();
1607 assert_eq!(
1608 row,
1609 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1610 );
1611
1612 let row = table
1614 .get_row(
1615 &OwnedRow::new(vec![Some(9_i32.into())]),
1616 HummockReadEpoch::NoWait(u64::MAX),
1617 )
1618 .await
1619 .unwrap();
1620 assert_eq!(
1621 row,
1622 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1623 );
1624 }
1625 _ => unreachable!(),
1626 }
1627 }
1628
1629 #[tokio::test]
1630 async fn test_ignore_insert_conflict() {
1631 let memory_state_store = MemoryStateStore::new();
1633 let table_id = TableId::new(1);
1634 let schema = Schema::new(vec![
1636 Field::unnamed(DataType::Int32),
1637 Field::unnamed(DataType::Int32),
1638 ]);
1639 let column_ids = vec![0.into(), 1.into()];
1640
1641 let chunk1 = StreamChunk::from_pretty(
1643 " i i
1644 + 1 3
1645 + 1 4
1646 + 2 5
1647 + 3 6",
1648 );
1649
1650 let chunk2 = StreamChunk::from_pretty(
1651 " i i
1652 + 1 5
1653 + 2 6",
1654 );
1655
1656 let chunk3 = StreamChunk::from_pretty(
1658 " i i
1659 + 1 6",
1660 );
1661
1662 let source = MockSource::with_messages(vec![
1664 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1665 Message::Chunk(chunk1),
1666 Message::Chunk(chunk2),
1667 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1668 Message::Chunk(chunk3),
1669 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1670 ])
1671 .into_executor(schema.clone(), StreamKey::new());
1672
1673 let order_types = vec![OrderType::ascending()];
1674 let column_descs = vec![
1675 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1676 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1677 ];
1678
1679 let table = BatchTable::for_test(
1680 memory_state_store.clone(),
1681 table_id,
1682 column_descs,
1683 order_types,
1684 vec![0],
1685 vec![0, 1],
1686 );
1687
1688 let mut materialize_executor = MaterializeExecutor::for_test(
1689 source,
1690 memory_state_store,
1691 table_id,
1692 vec![ColumnOrder::new(0, OrderType::ascending())],
1693 column_ids,
1694 Arc::new(AtomicU64::new(0)),
1695 ConflictBehavior::IgnoreConflict,
1696 )
1697 .await
1698 .boxed()
1699 .execute();
1700 materialize_executor.next().await.transpose().unwrap();
1701
1702 materialize_executor.next().await.transpose().unwrap();
1703 materialize_executor.next().await.transpose().unwrap();
1704
1705 match materialize_executor.next().await.transpose().unwrap() {
1707 Some(Message::Barrier(_)) => {
1708 let row = table
1709 .get_row(
1710 &OwnedRow::new(vec![Some(3_i32.into())]),
1711 HummockReadEpoch::NoWait(u64::MAX),
1712 )
1713 .await
1714 .unwrap();
1715 assert_eq!(
1716 row,
1717 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1718 );
1719
1720 let row = table
1721 .get_row(
1722 &OwnedRow::new(vec![Some(1_i32.into())]),
1723 HummockReadEpoch::NoWait(u64::MAX),
1724 )
1725 .await
1726 .unwrap();
1727 assert_eq!(
1728 row,
1729 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1730 );
1731
1732 let row = table
1733 .get_row(
1734 &OwnedRow::new(vec![Some(2_i32.into())]),
1735 HummockReadEpoch::NoWait(u64::MAX),
1736 )
1737 .await
1738 .unwrap();
1739 assert_eq!(
1740 row,
1741 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1742 );
1743 }
1744 _ => unreachable!(),
1745 }
1746 }
1747
1748 #[tokio::test]
1749 async fn test_ignore_delete_then_insert() {
1750 let memory_state_store = MemoryStateStore::new();
1752 let table_id = TableId::new(1);
1753 let schema = Schema::new(vec![
1755 Field::unnamed(DataType::Int32),
1756 Field::unnamed(DataType::Int32),
1757 ]);
1758 let column_ids = vec![0.into(), 1.into()];
1759
1760 let chunk1 = StreamChunk::from_pretty(
1762 " i i
1763 + 1 3
1764 - 1 3
1765 + 1 6",
1766 );
1767
1768 let source = MockSource::with_messages(vec![
1770 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1771 Message::Chunk(chunk1),
1772 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1773 ])
1774 .into_executor(schema.clone(), StreamKey::new());
1775
1776 let order_types = vec![OrderType::ascending()];
1777 let column_descs = vec![
1778 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1779 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1780 ];
1781
1782 let table = BatchTable::for_test(
1783 memory_state_store.clone(),
1784 table_id,
1785 column_descs,
1786 order_types,
1787 vec![0],
1788 vec![0, 1],
1789 );
1790
1791 let mut materialize_executor = MaterializeExecutor::for_test(
1792 source,
1793 memory_state_store,
1794 table_id,
1795 vec![ColumnOrder::new(0, OrderType::ascending())],
1796 column_ids,
1797 Arc::new(AtomicU64::new(0)),
1798 ConflictBehavior::IgnoreConflict,
1799 )
1800 .await
1801 .boxed()
1802 .execute();
1803 let _msg1 = materialize_executor
1804 .next()
1805 .await
1806 .transpose()
1807 .unwrap()
1808 .unwrap()
1809 .as_barrier()
1810 .unwrap();
1811 let _msg2 = materialize_executor
1812 .next()
1813 .await
1814 .transpose()
1815 .unwrap()
1816 .unwrap()
1817 .as_chunk()
1818 .unwrap();
1819 let _msg3 = materialize_executor
1820 .next()
1821 .await
1822 .transpose()
1823 .unwrap()
1824 .unwrap()
1825 .as_barrier()
1826 .unwrap();
1827
1828 let row = table
1829 .get_row(
1830 &OwnedRow::new(vec![Some(1_i32.into())]),
1831 HummockReadEpoch::NoWait(u64::MAX),
1832 )
1833 .await
1834 .unwrap();
1835 assert_eq!(
1836 row,
1837 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1838 );
1839 }
1840
1841 #[tokio::test]
1842 async fn test_ignore_delete_and_update_conflict() {
1843 let memory_state_store = MemoryStateStore::new();
1845 let table_id = TableId::new(1);
1846 let schema = Schema::new(vec![
1848 Field::unnamed(DataType::Int32),
1849 Field::unnamed(DataType::Int32),
1850 ]);
1851 let column_ids = vec![0.into(), 1.into()];
1852
1853 let chunk1 = StreamChunk::from_pretty(
1855 " i i
1856 + 1 4
1857 + 2 5
1858 + 3 6
1859 U- 8 1
1860 U+ 8 2
1861 + 8 3",
1862 );
1863
1864 let chunk2 = StreamChunk::from_pretty(
1866 " i i
1867 + 7 8
1868 - 3 4
1869 - 5 0",
1870 );
1871
1872 let chunk3 = StreamChunk::from_pretty(
1874 " i i
1875 + 1 5
1876 U- 2 4
1877 U+ 2 8
1878 U- 9 0
1879 U+ 9 1",
1880 );
1881
1882 let source = MockSource::with_messages(vec![
1884 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1885 Message::Chunk(chunk1),
1886 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1887 Message::Chunk(chunk2),
1888 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1889 Message::Chunk(chunk3),
1890 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1891 ])
1892 .into_executor(schema.clone(), StreamKey::new());
1893
1894 let order_types = vec![OrderType::ascending()];
1895 let column_descs = vec![
1896 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1897 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1898 ];
1899
1900 let table = BatchTable::for_test(
1901 memory_state_store.clone(),
1902 table_id,
1903 column_descs,
1904 order_types,
1905 vec![0],
1906 vec![0, 1],
1907 );
1908
1909 let mut materialize_executor = MaterializeExecutor::for_test(
1910 source,
1911 memory_state_store,
1912 table_id,
1913 vec![ColumnOrder::new(0, OrderType::ascending())],
1914 column_ids,
1915 Arc::new(AtomicU64::new(0)),
1916 ConflictBehavior::IgnoreConflict,
1917 )
1918 .await
1919 .boxed()
1920 .execute();
1921 materialize_executor.next().await.transpose().unwrap();
1922
1923 materialize_executor.next().await.transpose().unwrap();
1924
1925 match materialize_executor.next().await.transpose().unwrap() {
1927 Some(Message::Barrier(_)) => {
1928 let row = table
1930 .get_row(
1931 &OwnedRow::new(vec![Some(8_i32.into())]),
1932 HummockReadEpoch::NoWait(u64::MAX),
1933 )
1934 .await
1935 .unwrap();
1936 assert_eq!(
1937 row,
1938 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1939 );
1940 }
1941 _ => unreachable!(),
1942 }
1943 materialize_executor.next().await.transpose().unwrap();
1944
1945 match materialize_executor.next().await.transpose().unwrap() {
1946 Some(Message::Barrier(_)) => {
1947 let row = table
1948 .get_row(
1949 &OwnedRow::new(vec![Some(7_i32.into())]),
1950 HummockReadEpoch::NoWait(u64::MAX),
1951 )
1952 .await
1953 .unwrap();
1954 assert_eq!(
1955 row,
1956 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1957 );
1958
1959 let row = table
1961 .get_row(
1962 &OwnedRow::new(vec![Some(3_i32.into())]),
1963 HummockReadEpoch::NoWait(u64::MAX),
1964 )
1965 .await
1966 .unwrap();
1967 assert_eq!(row, None);
1968
1969 let row = table
1971 .get_row(
1972 &OwnedRow::new(vec![Some(5_i32.into())]),
1973 HummockReadEpoch::NoWait(u64::MAX),
1974 )
1975 .await
1976 .unwrap();
1977 assert_eq!(row, None);
1978 }
1979 _ => unreachable!(),
1980 }
1981
1982 materialize_executor.next().await.transpose().unwrap();
1983 match materialize_executor.next().await.transpose().unwrap() {
1986 Some(Message::Barrier(_)) => {
1987 let row = table
1988 .get_row(
1989 &OwnedRow::new(vec![Some(1_i32.into())]),
1990 HummockReadEpoch::NoWait(u64::MAX),
1991 )
1992 .await
1993 .unwrap();
1994 assert_eq!(
1995 row,
1996 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1997 );
1998
1999 let row = table
2001 .get_row(
2002 &OwnedRow::new(vec![Some(2_i32.into())]),
2003 HummockReadEpoch::NoWait(u64::MAX),
2004 )
2005 .await
2006 .unwrap();
2007 assert_eq!(
2008 row,
2009 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2010 );
2011
2012 let row = table
2014 .get_row(
2015 &OwnedRow::new(vec![Some(9_i32.into())]),
2016 HummockReadEpoch::NoWait(u64::MAX),
2017 )
2018 .await
2019 .unwrap();
2020 assert_eq!(
2021 row,
2022 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2023 );
2024 }
2025 _ => unreachable!(),
2026 }
2027 }
2028
2029 #[tokio::test]
2030 async fn test_do_update_if_not_null_conflict() {
2031 let memory_state_store = MemoryStateStore::new();
2033 let table_id = TableId::new(1);
2034 let schema = Schema::new(vec![
2036 Field::unnamed(DataType::Int32),
2037 Field::unnamed(DataType::Int32),
2038 ]);
2039 let column_ids = vec![0.into(), 1.into()];
2040
2041 let chunk1 = StreamChunk::from_pretty(
2043 " i i
2044 + 1 4
2045 + 2 .
2046 + 3 6
2047 U- 8 .
2048 U+ 8 2
2049 + 8 .",
2050 );
2051
2052 let chunk2 = StreamChunk::from_pretty(
2054 " i i
2055 + 7 8
2056 - 3 4
2057 - 5 0",
2058 );
2059
2060 let chunk3 = StreamChunk::from_pretty(
2062 " i i
2063 + 1 5
2064 + 7 .
2065 U- 2 4
2066 U+ 2 .
2067 U- 9 0
2068 U+ 9 1",
2069 );
2070
2071 let source = MockSource::with_messages(vec![
2073 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2074 Message::Chunk(chunk1),
2075 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2076 Message::Chunk(chunk2),
2077 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2078 Message::Chunk(chunk3),
2079 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2080 ])
2081 .into_executor(schema.clone(), StreamKey::new());
2082
2083 let order_types = vec![OrderType::ascending()];
2084 let column_descs = vec![
2085 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2086 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2087 ];
2088
2089 let table = BatchTable::for_test(
2090 memory_state_store.clone(),
2091 table_id,
2092 column_descs,
2093 order_types,
2094 vec![0],
2095 vec![0, 1],
2096 );
2097
2098 let mut materialize_executor = MaterializeExecutor::for_test(
2099 source,
2100 memory_state_store,
2101 table_id,
2102 vec![ColumnOrder::new(0, OrderType::ascending())],
2103 column_ids,
2104 Arc::new(AtomicU64::new(0)),
2105 ConflictBehavior::DoUpdateIfNotNull,
2106 )
2107 .await
2108 .boxed()
2109 .execute();
2110 materialize_executor.next().await.transpose().unwrap();
2111
2112 materialize_executor.next().await.transpose().unwrap();
2113
2114 match materialize_executor.next().await.transpose().unwrap() {
2116 Some(Message::Barrier(_)) => {
2117 let row = table
2118 .get_row(
2119 &OwnedRow::new(vec![Some(8_i32.into())]),
2120 HummockReadEpoch::NoWait(u64::MAX),
2121 )
2122 .await
2123 .unwrap();
2124 assert_eq!(
2125 row,
2126 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2127 );
2128
2129 let row = table
2130 .get_row(
2131 &OwnedRow::new(vec![Some(2_i32.into())]),
2132 HummockReadEpoch::NoWait(u64::MAX),
2133 )
2134 .await
2135 .unwrap();
2136 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2137 }
2138 _ => unreachable!(),
2139 }
2140 materialize_executor.next().await.transpose().unwrap();
2141
2142 match materialize_executor.next().await.transpose().unwrap() {
2143 Some(Message::Barrier(_)) => {
2144 let row = table
2145 .get_row(
2146 &OwnedRow::new(vec![Some(7_i32.into())]),
2147 HummockReadEpoch::NoWait(u64::MAX),
2148 )
2149 .await
2150 .unwrap();
2151 assert_eq!(
2152 row,
2153 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2154 );
2155
2156 let row = table
2158 .get_row(
2159 &OwnedRow::new(vec![Some(3_i32.into())]),
2160 HummockReadEpoch::NoWait(u64::MAX),
2161 )
2162 .await
2163 .unwrap();
2164 assert_eq!(row, None);
2165
2166 let row = table
2168 .get_row(
2169 &OwnedRow::new(vec![Some(5_i32.into())]),
2170 HummockReadEpoch::NoWait(u64::MAX),
2171 )
2172 .await
2173 .unwrap();
2174 assert_eq!(row, None);
2175 }
2176 _ => unreachable!(),
2177 }
2178
2179 materialize_executor.next().await.transpose().unwrap();
2180 match materialize_executor.next().await.transpose().unwrap() {
2183 Some(Message::Barrier(_)) => {
2184 let row = table
2185 .get_row(
2186 &OwnedRow::new(vec![Some(7_i32.into())]),
2187 HummockReadEpoch::NoWait(u64::MAX),
2188 )
2189 .await
2190 .unwrap();
2191 assert_eq!(
2192 row,
2193 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2194 );
2195
2196 let row = table
2198 .get_row(
2199 &OwnedRow::new(vec![Some(2_i32.into())]),
2200 HummockReadEpoch::NoWait(u64::MAX),
2201 )
2202 .await
2203 .unwrap();
2204 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2205
2206 let row = table
2208 .get_row(
2209 &OwnedRow::new(vec![Some(9_i32.into())]),
2210 HummockReadEpoch::NoWait(u64::MAX),
2211 )
2212 .await
2213 .unwrap();
2214 assert_eq!(
2215 row,
2216 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2217 );
2218 }
2219 _ => unreachable!(),
2220 }
2221 }
2222
2223 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2224 const KN: u32 = 4;
2225 const SEED: u64 = 998244353;
2226 let mut ret = vec![];
2227 let mut builder =
2228 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2229 let mut rng = SmallRng::seed_from_u64(SEED);
2230
2231 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2232 let len = c.data_chunk().capacity();
2233 let mut c = StreamChunkMut::from(c);
2234 for i in 0..len {
2235 c.set_vis(i, rng.random_bool(0.5));
2236 }
2237 c.into()
2238 };
2239 for _ in 0..row_number {
2240 let k = (rng.next_u32() % KN) as i32;
2241 let v = rng.next_u32() as i32;
2242 let op = if rng.random_bool(0.5) {
2243 Op::Insert
2244 } else {
2245 Op::Delete
2246 };
2247 if let Some(c) =
2248 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2249 {
2250 ret.push(random_vis(c, &mut rng));
2251 }
2252 }
2253 if let Some(c) = builder.take() {
2254 ret.push(random_vis(c, &mut rng));
2255 }
2256 ret
2257 }
2258
2259 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2260 const N: usize = 100000;
2261
2262 let memory_state_store = MemoryStateStore::new();
2264 let table_id = TableId::new(1);
2265 let schema = Schema::new(vec![
2267 Field::unnamed(DataType::Int32),
2268 Field::unnamed(DataType::Int32),
2269 ]);
2270 let column_ids = vec![0.into(), 1.into()];
2271
2272 let chunks = gen_fuzz_data(N, 128);
2273 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2274 .chain(chunks.into_iter().map(Message::Chunk))
2275 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2276 test_epoch(2),
2277 ))))
2278 .collect();
2279 let source =
2281 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2282
2283 let mut materialize_executor = MaterializeExecutor::for_test(
2284 source,
2285 memory_state_store.clone(),
2286 table_id,
2287 vec![ColumnOrder::new(0, OrderType::ascending())],
2288 column_ids,
2289 Arc::new(AtomicU64::new(0)),
2290 conflict_behavior,
2291 )
2292 .await
2293 .boxed()
2294 .execute();
2295 materialize_executor.expect_barrier().await;
2296
2297 let order_types = vec![OrderType::ascending()];
2298 let column_descs = vec![
2299 ColumnDesc::unnamed(0.into(), DataType::Int32),
2300 ColumnDesc::unnamed(1.into(), DataType::Int32),
2301 ];
2302 let pk_indices = vec![0];
2303
2304 let mut table = StateTable::from_table_catalog(
2305 &crate::common::table::test_utils::gen_pbtable(
2306 TableId::from(1002),
2307 column_descs.clone(),
2308 order_types,
2309 pk_indices,
2310 0,
2311 ),
2312 memory_state_store.clone(),
2313 None,
2314 )
2315 .await;
2316
2317 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2318 table.write_chunk(c);
2320 }
2321 }
2322
2323 #[tokio::test]
2324 async fn fuzz_test_stream_consistent_upsert() {
2325 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2326 }
2327
2328 #[tokio::test]
2329 async fn fuzz_test_stream_consistent_ignore() {
2330 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2331 }
2332}