1use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57 NormalIngestion,
58 MergingData,
59 CleanUp,
60 CommitAndYieldBarrier {
61 barrier: BarrierInner<M>,
62 expect_next_state: Box<MaterializeStreamState<M>>,
63 },
64 RefreshEnd {
65 on_complete_epoch: EpochPair,
66 },
67}
68
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71 input: Executor,
72
73 schema: Schema,
74
75 state_table: StateTableInner<S, SD>,
76
77 stream_key_indices: Vec<usize>,
88
89 arrange_key_indices: Vec<usize>,
91
92 actor_context: ActorContextRef,
93
94 materialize_cache: Option<MaterializeCache>,
96
97 conflict_behavior: ConflictBehavior,
98
99 cleaned_by_ttl_watermark: bool,
101
102 version_column_indices: Vec<u32>,
103
104 may_have_downstream: bool,
105
106 subscriber_ids: HashSet<SubscriberId>,
107
108 metrics: MaterializeMetrics,
109
110 is_dummy_table: bool,
113
114 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
116
117 local_barrier_manager: LocalBarrierManager,
119}
120
121pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
123 pub table_catalog: Table,
125
126 pub staging_table_catalog: Table,
128
129 pub is_refreshing: bool,
131
132 pub staging_table: StateTableInner<S, SD>,
139
140 pub progress_table: RefreshProgressTable<S>,
142
143 pub table_id: TableId,
145}
146
147impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
148 pub async fn new(
150 store: S,
151 table_catalog: &Table,
152 staging_table_catalog: &Table,
153 progress_state_table: &Table,
154 vnodes: Option<Arc<Bitmap>>,
155 ) -> Self {
156 let table_id = table_catalog.id;
157
158 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
160 staging_table_catalog,
161 store.clone(),
162 vnodes.clone(),
163 )
164 .await;
165
166 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
167 progress_state_table,
168 store,
169 vnodes,
170 )
171 .await;
172
173 let pk_len = table_catalog.pk.len();
175 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
176
177 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
178
179 Self {
180 table_catalog: table_catalog.clone(),
181 staging_table_catalog: staging_table_catalog.clone(),
182 is_refreshing: false,
183 staging_table,
184 progress_table,
185 table_id,
186 }
187 }
188}
189
190fn get_op_consistency_level(
191 cleaned_by_ttl_watermark: bool,
192 conflict_behavior: ConflictBehavior,
193 may_have_downstream: bool,
194 subscriber_ids: &HashSet<SubscriberId>,
195) -> StateTableOpConsistencyLevel {
196 if cleaned_by_ttl_watermark {
197 StateTableOpConsistencyLevel::Inconsistent
201 } else if !subscriber_ids.is_empty() {
202 StateTableOpConsistencyLevel::LogStoreEnabled
203 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
204 StateTableOpConsistencyLevel::Inconsistent
207 } else {
208 StateTableOpConsistencyLevel::ConsistentOldValue
209 }
210}
211
212impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
213 #[allow(clippy::too_many_arguments)]
217 pub async fn new(
218 input: Executor,
219 schema: Schema,
220 store: S,
221 arrange_key: Vec<ColumnOrder>,
222 actor_context: ActorContextRef,
223 vnodes: Option<Arc<Bitmap>>,
224 table_catalog: &Table,
225 watermark_epoch: AtomicU64Ref,
226 conflict_behavior: ConflictBehavior,
227 version_column_indices: Vec<u32>,
228 metrics: Arc<StreamingMetrics>,
229 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
230 cleaned_by_ttl_watermark: bool,
231 local_barrier_manager: LocalBarrierManager,
232 ) -> Self {
233 let table_columns: Vec<ColumnDesc> = table_catalog
234 .columns
235 .iter()
236 .map(|col| col.column_desc.as_ref().unwrap().into())
237 .collect();
238
239 let toastable_column_indices = if table_catalog.cdc_table_type()
242 == risingwave_pb::catalog::table::CdcTableType::Postgres
243 {
244 let toastable_indices: Vec<usize> = table_columns
245 .iter()
246 .enumerate()
247 .filter_map(|(index, column)| match &column.data_type {
248 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
257 Some(index)
258 }
259 _ => None,
260 })
261 .collect();
262
263 if toastable_indices.is_empty() {
264 None
265 } else {
266 Some(toastable_indices)
267 }
268 } else {
269 None
270 };
271
272 let row_serde: BasicSerde = BasicSerde::new(
273 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
274 Arc::from(table_columns.into_boxed_slice()),
275 );
276
277 let stream_key_indices: Vec<usize> = table_catalog
278 .stream_key
279 .iter()
280 .map(|idx| *idx as usize)
281 .collect();
282 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
283 let may_have_downstream = actor_context.initial_dispatch_num != 0;
284 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
285 let op_consistency_level = get_op_consistency_level(
286 cleaned_by_ttl_watermark,
287 conflict_behavior,
288 may_have_downstream,
289 &subscriber_ids,
290 );
291 let state_table_metrics = metrics.new_state_table_metrics(
292 table_catalog.id,
293 actor_context.id,
294 actor_context.fragment_id,
295 );
296 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
298 .with_op_consistency_level(op_consistency_level)
299 .enable_preload_all_rows_by_config(&actor_context.config)
300 .enable_vnode_key_stats(
301 actor_context
302 .config
303 .developer
304 .enable_vnode_key_stats_for_materialize,
305 &actor_context.config,
306 )
307 .with_metrics(state_table_metrics)
308 .build()
309 .await;
310
311 let mv_metrics = metrics.new_materialize_metrics(
312 table_catalog.id,
313 actor_context.id,
314 actor_context.fragment_id,
315 );
316 let cache_metrics = metrics.new_materialize_cache_metrics(
317 table_catalog.id,
318 actor_context.id,
319 actor_context.fragment_id,
320 );
321
322 let metrics_info =
323 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
324
325 let is_dummy_table =
326 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
327
328 Self {
329 input,
330 schema,
331 state_table,
332 stream_key_indices,
333 arrange_key_indices,
334 actor_context,
335 materialize_cache: MaterializeCache::new(
336 watermark_epoch,
337 metrics_info,
338 row_serde,
339 version_column_indices.clone(),
340 conflict_behavior,
341 toastable_column_indices,
342 cache_metrics,
343 ),
344 conflict_behavior,
345 cleaned_by_ttl_watermark,
346 version_column_indices,
347 is_dummy_table,
348 may_have_downstream,
349 subscriber_ids,
350 metrics: mv_metrics,
351 refresh_args,
352 local_barrier_manager,
353 }
354 }
355
356 #[try_stream(ok = Message, error = StreamExecutorError)]
357 async fn execute_inner(mut self) {
358 let mv_table_id = self.state_table.table_id();
359 let data_types = self.schema.data_types();
360 let mut input = self.input.execute();
361
362 let barrier = expect_first_barrier(&mut input).await?;
363 let first_epoch = barrier.epoch;
364 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
367 self.state_table.init_epoch(first_epoch).await?;
368
369 let mut inner_state =
371 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
372 if let Some(ref mut refresh_args) = self.refresh_args {
374 refresh_args.staging_table.init_epoch(first_epoch).await?;
375
376 refresh_args.progress_table.recover(first_epoch).await?;
378
379 let progress_stats = refresh_args.progress_table.get_progress_stats();
381 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
382 refresh_args.is_refreshing = true;
383 tracing::info!(
384 total_vnodes = progress_stats.total_vnodes,
385 completed_vnodes = progress_stats.completed_vnodes,
386 "Recovered refresh in progress, resuming refresh operation"
387 );
388
389 let incomplete_vnodes: Vec<_> = refresh_args
393 .progress_table
394 .get_all_progress()
395 .iter()
396 .filter(|(_, entry)| !entry.is_completed)
397 .map(|(&vnode, _)| vnode)
398 .collect();
399
400 if !incomplete_vnodes.is_empty() {
401 tracing::info!(
403 incomplete_vnodes = incomplete_vnodes.len(),
404 "Recovery detected incomplete VNodes, resuming refresh operation"
405 );
406 } else {
409 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
411 }
412 }
413 }
414
415 if let Some(ref refresh_args) = self.refresh_args
417 && refresh_args.is_refreshing
418 {
419 let incomplete_vnodes: Vec<_> = refresh_args
421 .progress_table
422 .get_all_progress()
423 .iter()
424 .filter(|(_, entry)| !entry.is_completed)
425 .map(|(&vnode, _)| vnode)
426 .collect();
427 if !incomplete_vnodes.is_empty() {
428 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
430 tracing::info!(
431 incomplete_vnodes = incomplete_vnodes.len(),
432 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
433 );
434 }
435 }
436
437 'main_loop: loop {
439 match *inner_state {
440 MaterializeStreamState::NormalIngestion => {
441 #[for_await]
442 '_normal_ingest: for msg in input.by_ref() {
443 let msg = msg?;
444 if let Some(cache) = &mut self.materialize_cache {
445 cache.evict();
446 }
447
448 match msg {
449 Message::Watermark(w) => {
450 if self.cleaned_by_ttl_watermark
451 && self.state_table.clean_watermark_index == Some(w.col_idx)
452 {
453 self.state_table.update_watermark(w.val.clone());
454 }
455 yield Message::Watermark(w);
456 }
457 Message::Chunk(chunk) if self.is_dummy_table => {
458 self.metrics
459 .materialize_input_row_count
460 .inc_by(chunk.cardinality() as u64);
461 yield Message::Chunk(chunk);
462 }
463 Message::Chunk(chunk) => {
464 self.metrics
465 .materialize_input_row_count
466 .inc_by(chunk.cardinality() as u64);
467
468 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
475 self.conflict_behavior
476 && !self.state_table.is_consistent_op()
477 && !self.cleaned_by_ttl_watermark
478 && self.version_column_indices.is_empty()
479 {
480 ConflictBehavior::NoCheck
481 } else {
482 self.conflict_behavior
483 };
484
485 match optimized_conflict_behavior {
486 checked_conflict_behaviors!() => {
487 if chunk.cardinality() == 0 {
488 continue;
490 }
491
492 if let Some(ref mut refresh_args) = self.refresh_args
495 && refresh_args.is_refreshing
496 {
497 let key_chunk = chunk
498 .clone()
499 .project(self.state_table.pk_indices());
500 tracing::trace!(
501 staging_chunk = %key_chunk.to_pretty(),
502 input_chunk = %chunk.to_pretty(),
503 "writing to staging table"
504 );
505 if cfg!(debug_assertions) {
506 assert!(
508 key_chunk
509 .ops()
510 .iter()
511 .all(|op| op == &Op::Insert)
512 );
513 }
514 refresh_args
515 .staging_table
516 .write_chunk(key_chunk.clone());
517 refresh_args.staging_table.try_flush().await?;
518 }
519
520 let cache = self.materialize_cache.as_mut().unwrap();
521 let change_buffer =
522 cache.handle_new(chunk, &self.state_table).await?;
523
524 let output_chunk = if self.stream_key_indices
525 == self.state_table.pk_indices()
526 {
527 change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
528 data_types.clone(),
529 )
530 } else {
531 debug_assert!(
534 self.state_table
535 .pk_indices()
536 .iter()
537 .all(|&i| self.stream_key_indices.contains(&i))
538 );
539 change_buffer.into_chunk_with_key(
540 data_types.clone(),
541 &self.stream_key_indices,
542 )
543 };
544
545 match output_chunk {
546 Some(output_chunk) => {
547 self.state_table.write_chunk(output_chunk.clone());
548 self.state_table.try_flush().await?;
549 yield Message::Chunk(output_chunk);
550 }
551 None => continue,
552 }
553 }
554 ConflictBehavior::NoCheck => {
555 self.state_table.write_chunk(chunk.clone());
556 self.state_table.try_flush().await?;
557
558 if let Some(ref mut refresh_args) = self.refresh_args
560 && refresh_args.is_refreshing
561 {
562 let key_chunk = chunk
563 .clone()
564 .project(self.state_table.pk_indices());
565 tracing::trace!(
566 staging_chunk = %key_chunk.to_pretty(),
567 input_chunk = %chunk.to_pretty(),
568 "writing to staging table"
569 );
570 if cfg!(debug_assertions) {
571 assert!(
573 key_chunk
574 .ops()
575 .iter()
576 .all(|op| op == &Op::Insert)
577 );
578 }
579 refresh_args
580 .staging_table
581 .write_chunk(key_chunk.clone());
582 refresh_args.staging_table.try_flush().await?;
583 }
584
585 yield Message::Chunk(chunk);
586 }
587 }
588 }
589 Message::Barrier(barrier) => {
590 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
591 barrier,
592 expect_next_state: Box::new(
593 MaterializeStreamState::NormalIngestion,
594 ),
595 };
596 continue 'main_loop;
597 }
598 }
599 }
600
601 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
602 anyhow::anyhow!(
603 "Input stream terminated unexpectedly during normal ingestion"
604 ),
605 )));
606 }
607 MaterializeStreamState::MergingData => {
608 let Some(refresh_args) = self.refresh_args.as_mut() else {
609 panic!(
610 "MaterializeExecutor entered CleanUp state without refresh_args configured"
611 );
612 };
613 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
614
615 debug_assert_eq!(
616 self.state_table.vnodes(),
617 refresh_args.staging_table.vnodes()
618 );
619 debug_assert_eq!(
620 refresh_args.staging_table.vnodes(),
621 refresh_args.progress_table.vnodes()
622 );
623
624 let mut rows_to_delete = vec![];
625 let mut merge_complete = false;
626 let mut pending_barrier: Option<Barrier> = None;
627
628 {
630 let left_input = input.by_ref().map(Either::Left);
631 let right_merge_sort = pin!(
632 Self::make_mergesort_stream(
633 &self.state_table,
634 &refresh_args.staging_table,
635 &mut refresh_args.progress_table
636 )
637 .map(Either::Right)
638 );
639
640 let mut merge_stream =
643 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
644 stream::PollNext::Left
645 });
646
647 #[for_await]
648 'merge_stream: for either in &mut merge_stream {
649 match either {
650 Either::Left(msg) => {
651 let msg = msg?;
652 match msg {
653 Message::Watermark(w) => yield Message::Watermark(w),
654 Message::Chunk(chunk) => {
655 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
656 }
657 Message::Barrier(b) => {
658 pending_barrier = Some(b);
659 break 'merge_stream;
660 }
661 }
662 }
663 Either::Right(result) => {
664 match result? {
665 Some((_vnode, row)) => {
666 rows_to_delete.push(row);
667 }
668 None => {
669 merge_complete = true;
671
672 }
674 }
675 }
676 }
677 }
678 }
679
680 for row in &rows_to_delete {
682 self.state_table.delete(row);
683 }
684 if !rows_to_delete.is_empty() {
685 let to_delete_chunk = StreamChunk::from_rows(
686 &rows_to_delete
687 .iter()
688 .map(|row| (Op::Delete, row))
689 .collect_vec(),
690 &self.schema.data_types(),
691 );
692
693 yield Message::Chunk(to_delete_chunk);
694 }
695
696 assert!(pending_barrier.is_some(), "pending barrier is not set");
698
699 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
700 barrier: pending_barrier.unwrap(),
701 expect_next_state: if merge_complete {
702 Box::new(MaterializeStreamState::CleanUp)
703 } else {
704 Box::new(MaterializeStreamState::MergingData)
705 },
706 };
707 continue 'main_loop;
708 }
709 MaterializeStreamState::CleanUp => {
710 let Some(refresh_args) = self.refresh_args.as_mut() else {
711 panic!(
712 "MaterializeExecutor entered MergingData state without refresh_args configured"
713 );
714 };
715 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
716
717 #[for_await]
718 for msg in input.by_ref() {
719 let msg = msg?;
720 match msg {
721 Message::Watermark(w) => {
722 if self.cleaned_by_ttl_watermark
723 && self.state_table.clean_watermark_index == Some(w.col_idx)
724 {
725 self.state_table.update_watermark(w.val.clone());
726 }
727 yield Message::Watermark(w)
728 }
729 Message::Chunk(chunk) => {
730 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
731 }
732 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
733 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
734 barrier,
735 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
736 };
737 continue 'main_loop;
738 }
739 Message::Barrier(barrier) => {
740 let staging_table_id = refresh_args.staging_table.table_id();
741 let epoch = barrier.epoch;
742 self.local_barrier_manager.report_refresh_finished(
743 epoch,
744 self.actor_context.id,
745 refresh_args.table_id,
746 staging_table_id,
747 );
748 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
749
750 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
751 barrier,
752 expect_next_state: Box::new(
753 MaterializeStreamState::RefreshEnd {
754 on_complete_epoch: epoch,
755 },
756 ),
757 };
758 continue 'main_loop;
759 }
760 }
761 }
762 }
763 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
764 let Some(refresh_args) = self.refresh_args.as_mut() else {
765 panic!(
766 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
767 );
768 };
769 let staging_table_id = refresh_args.staging_table.table_id();
770
771 let staging_store = refresh_args.staging_table.state_store().clone();
773 staging_store
774 .try_wait_epoch(
775 HummockReadEpoch::Committed(on_complete_epoch.prev),
776 TryWaitEpochOptions {
777 table_id: staging_table_id,
778 },
779 )
780 .await?;
781
782 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
783
784 if let Some(ref mut refresh_args) = self.refresh_args {
785 refresh_args.is_refreshing = false;
786 }
787 *inner_state = MaterializeStreamState::NormalIngestion;
788 continue 'main_loop;
789 }
790 MaterializeStreamState::CommitAndYieldBarrier {
791 barrier,
792 mut expect_next_state,
793 } => {
794 if let Some(ref mut refresh_args) = self.refresh_args {
795 match barrier.mutation.as_deref() {
796 Some(Mutation::RefreshStart {
797 table_id: refresh_table_id,
798 associated_source_id: _,
799 }) if *refresh_table_id == refresh_args.table_id => {
800 debug_assert!(
801 !refresh_args.is_refreshing,
802 "cannot start refresh twice"
803 );
804 refresh_args.is_refreshing = true;
805 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
806
807 Self::init_refresh_progress(
809 &self.state_table,
810 &mut refresh_args.progress_table,
811 barrier.epoch.curr,
812 )?;
813 }
814 Some(Mutation::LoadFinish {
815 associated_source_id: load_finish_source_id,
816 }) => {
817 let associated_source_id: SourceId = match refresh_args
819 .table_catalog
820 .optional_associated_source_id
821 {
822 Some(id) => id.into(),
823 None => unreachable!("associated_source_id is not set"),
824 };
825
826 if *load_finish_source_id == associated_source_id {
827 tracing::info!(
828 %load_finish_source_id,
829 "LoadFinish received, starting data replacement"
830 );
831 expect_next_state =
832 Box::new(MaterializeStreamState::<_>::MergingData);
833 }
834 }
835 _ => {}
836 }
837 }
838
839 if !self.may_have_downstream
843 && barrier.has_more_downstream_fragments(self.actor_context.id)
844 {
845 self.may_have_downstream = true;
846 }
847 Self::may_update_depended_subscriptions(
848 &mut self.subscriber_ids,
849 &barrier,
850 mv_table_id,
851 );
852 let op_consistency_level = get_op_consistency_level(
853 self.cleaned_by_ttl_watermark,
854 self.conflict_behavior,
855 self.may_have_downstream,
856 &self.subscriber_ids,
857 );
858 let post_commit = self
859 .state_table
860 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
861 .await?;
862
863 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
864
865 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
867 {
868 Some((
871 refresh_args.staging_table.commit(barrier.epoch).await?,
872 refresh_args.progress_table.commit(barrier.epoch).await?,
873 ))
874 } else {
875 None
876 };
877
878 let b_epoch = barrier.epoch;
879 yield Message::Barrier(barrier);
880
881 if let Some((_, cache_may_stale)) = post_commit
883 .post_yield_barrier(update_vnode_bitmap.clone())
884 .await?
885 && cache_may_stale
886 && let Some(cache) = &mut self.materialize_cache
887 {
888 cache.clear();
889 }
890
891 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
893 staging_post_commit
894 .post_yield_barrier(update_vnode_bitmap.clone())
895 .await?;
896 progress_post_commit
897 .post_yield_barrier(update_vnode_bitmap)
898 .await?;
899 }
900
901 self.metrics
902 .materialize_current_epoch
903 .set(b_epoch.curr as i64);
904
905 *inner_state = *expect_next_state;
908 }
909 }
910 }
911 }
912
913 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
917 async fn make_mergesort_stream<'a>(
918 main_table: &'a StateTableInner<S, SD>,
919 staging_table: &'a StateTableInner<S, SD>,
920 progress_table: &'a mut RefreshProgressTable<S>,
921 ) {
922 for vnode in main_table.vnodes().clone().iter_vnodes() {
923 let mut processed_rows = 0;
924 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
926 if let Some(current_entry) = progress_table.get_progress(vnode) {
927 if current_entry.is_completed {
929 tracing::debug!(
930 vnode = vnode.to_index(),
931 "Skipping already completed VNode during recovery"
932 );
933 continue;
934 }
935 processed_rows += current_entry.processed_rows;
936 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
937
938 if let Some(current_state) = ¤t_entry.current_pos {
939 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
940 } else {
941 (Bound::Unbounded, Bound::Unbounded)
942 }
943 } else {
944 (Bound::Unbounded, Bound::Unbounded)
945 };
946
947 let iter_main = main_table
948 .iter_keyed_row_with_vnode(
949 vnode,
950 &pk_range,
951 PrefetchOptions::prefetch_for_large_range_scan(),
952 )
953 .await?;
954 let iter_staging = staging_table
955 .iter_keyed_row_with_vnode(
956 vnode,
957 &pk_range,
958 PrefetchOptions::prefetch_for_large_range_scan(),
959 )
960 .await?;
961
962 pin_mut!(iter_main);
963 pin_mut!(iter_staging);
964
965 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
967 let mut staging_item: Option<KeyedRow<Bytes>> =
968 iter_staging.next().await.transpose()?;
969
970 while let Some(main_kv) = main_item {
971 let main_key = main_kv.key();
972
973 let mut should_delete = false;
975 while let Some(staging_kv) = &staging_item {
976 let staging_key = staging_kv.key();
977 match main_key.cmp(staging_key) {
978 std::cmp::Ordering::Greater => {
979 staging_item = iter_staging.next().await.transpose()?;
981 }
982 std::cmp::Ordering::Equal => {
983 break;
985 }
986 std::cmp::Ordering::Less => {
987 should_delete = true;
989 break;
990 }
991 }
992 }
993
994 if staging_item.is_none() {
996 should_delete = true;
997 }
998
999 if should_delete {
1000 yield Some((vnode, main_kv.row().clone()));
1001 }
1002
1003 processed_rows += 1;
1005 tracing::debug!(
1006 "set progress table: vnode = {:?}, processed_rows = {:?}",
1007 vnode,
1008 processed_rows
1009 );
1010 progress_table.set_progress(
1011 vnode,
1012 Some(
1013 main_kv
1014 .row()
1015 .project(main_table.pk_indices())
1016 .to_owned_row(),
1017 ),
1018 false,
1019 processed_rows,
1020 )?;
1021 main_item = iter_main.next().await.transpose()?;
1022 }
1023
1024 if let Some(current_entry) = progress_table.get_progress(vnode) {
1026 progress_table.set_progress(
1027 vnode,
1028 current_entry.current_pos.clone(),
1029 true, current_entry.processed_rows,
1031 )?;
1032
1033 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1034 }
1035 }
1036
1037 yield None;
1039 }
1040
1041 fn may_update_depended_subscriptions(
1043 depended_subscriptions: &mut HashSet<SubscriberId>,
1044 barrier: &Barrier,
1045 mv_table_id: TableId,
1046 ) {
1047 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1048 if !depended_subscriptions.insert(subscriber_id) {
1049 warn!(
1050 ?depended_subscriptions,
1051 %mv_table_id,
1052 %subscriber_id,
1053 "subscription id already exists"
1054 );
1055 }
1056 }
1057
1058 if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1059 for SubscriptionUpstreamInfo {
1060 subscriber_id,
1061 upstream_mv_table_id,
1062 } in subscriptions_to_drop
1063 {
1064 if *upstream_mv_table_id == mv_table_id
1065 && !depended_subscriptions.remove(subscriber_id)
1066 {
1067 warn!(
1068 ?depended_subscriptions,
1069 %mv_table_id,
1070 %subscriber_id,
1071 "drop non existing subscriber_id id"
1072 );
1073 }
1074 }
1075 }
1076 }
1077
1078 fn init_refresh_progress(
1080 state_table: &StateTableInner<S, SD>,
1081 progress_table: &mut RefreshProgressTable<S>,
1082 _epoch: u64,
1083 ) -> StreamExecutorResult<()> {
1084 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1085
1086 for vnode in state_table.vnodes().iter_vnodes() {
1088 progress_table.set_progress(
1089 vnode, None, false, 0, )?;
1093 }
1094
1095 tracing::info!(
1096 vnodes_count = state_table.vnodes().count_ones(),
1097 "Initialized refresh progress tracking for all VNodes"
1098 );
1099
1100 Ok(())
1101 }
1102}
1103
1104impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1105 #[cfg(any(test, feature = "test"))]
1107 pub async fn for_test(
1108 input: Executor,
1109 store: S,
1110 table_id: TableId,
1111 keys: Vec<ColumnOrder>,
1112 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1113 watermark_epoch: AtomicU64Ref,
1114 conflict_behavior: ConflictBehavior,
1115 ) -> Self {
1116 Self::for_test_inner(
1117 input,
1118 store,
1119 table_id,
1120 keys,
1121 column_ids,
1122 watermark_epoch,
1123 conflict_behavior,
1124 None,
1125 )
1126 .await
1127 }
1128
1129 #[cfg(any(test, feature = "test"))]
1130 #[allow(clippy::too_many_arguments)]
1131 pub async fn for_test_with_stream_key(
1132 input: Executor,
1133 store: S,
1134 table_id: TableId,
1135 keys: Vec<ColumnOrder>,
1136 stream_key: Vec<usize>,
1137 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1138 watermark_epoch: AtomicU64Ref,
1139 conflict_behavior: ConflictBehavior,
1140 ) -> Self {
1141 Self::for_test_inner(
1142 input,
1143 store,
1144 table_id,
1145 keys,
1146 column_ids,
1147 watermark_epoch,
1148 conflict_behavior,
1149 Some(stream_key),
1150 )
1151 .await
1152 }
1153
1154 #[cfg(any(test, feature = "test"))]
1155 #[allow(clippy::too_many_arguments)]
1156 async fn for_test_inner(
1157 input: Executor,
1158 store: S,
1159 table_id: TableId,
1160 keys: Vec<ColumnOrder>,
1161 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1162 watermark_epoch: AtomicU64Ref,
1163 conflict_behavior: ConflictBehavior,
1164 stream_key: Option<Vec<usize>>,
1165 ) -> Self {
1166 use risingwave_common::util::iter_util::ZipEqFast;
1167
1168 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1169 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1170 let schema = input.schema().clone();
1171 let columns: Vec<ColumnDesc> = column_ids
1172 .into_iter()
1173 .zip_eq_fast(schema.fields.iter())
1174 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1175 .collect_vec();
1176
1177 let row_serde = BasicSerde::new(
1178 Arc::from((0..columns.len()).collect_vec()),
1179 Arc::from(columns.clone().into_boxed_slice()),
1180 );
1181 let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1182 let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1183 table_id,
1184 columns,
1185 arrange_order_types,
1186 arrange_columns.clone(),
1187 0,
1188 );
1189 table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1190 let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1191
1192 let unused = StreamingMetrics::unused();
1193 let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1194 let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1195
1196 Self {
1197 input,
1198 schema,
1199 state_table,
1200 stream_key_indices,
1201 arrange_key_indices: arrange_columns.clone(),
1202 actor_context: ActorContext::for_test(0),
1203 materialize_cache: MaterializeCache::new(
1204 watermark_epoch,
1205 MetricsInfo::for_test(),
1206 row_serde,
1207 vec![],
1208 conflict_behavior,
1209 None,
1210 cache_metrics,
1211 ),
1212 conflict_behavior,
1213 cleaned_by_ttl_watermark: false,
1214 version_column_indices: vec![],
1215 is_dummy_table: false,
1216 may_have_downstream: true,
1217 subscriber_ids: HashSet::new(),
1218 metrics,
1219 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1221 }
1222 }
1223}
1224
1225impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1226 fn execute(self: Box<Self>) -> BoxedMessageStream {
1227 self.execute_inner().boxed()
1228 }
1229}
1230
1231impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1233 f.debug_struct("MaterializeExecutor")
1234 .field("arrange_key_indices", &self.arrange_key_indices)
1235 .field("stream_key_indices", &self.stream_key_indices)
1236 .finish()
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242
1243 use std::iter;
1244 use std::sync::atomic::AtomicU64;
1245
1246 use rand::rngs::SmallRng;
1247 use rand::{Rng, RngCore, SeedableRng};
1248 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1249 use risingwave_common::catalog::Field;
1250 use risingwave_common::util::epoch::test_epoch;
1251 use risingwave_common::util::sort_util::OrderType;
1252 use risingwave_hummock_sdk::HummockReadEpoch;
1253 use risingwave_storage::memory::MemoryStateStore;
1254 use risingwave_storage::table::batch_table::BatchTable;
1255
1256 use super::*;
1257 use crate::executor::test_utils::*;
1258
1259 #[tokio::test]
1260 async fn test_materialize_executor() {
1261 let memory_state_store = MemoryStateStore::new();
1263 let table_id = TableId::new(1);
1264 let schema = Schema::new(vec![
1266 Field::unnamed(DataType::Int32),
1267 Field::unnamed(DataType::Int32),
1268 ]);
1269 let column_ids = vec![0.into(), 1.into()];
1270
1271 let chunk1 = StreamChunk::from_pretty(
1273 " i i
1274 + 1 4
1275 + 2 5
1276 + 3 6",
1277 );
1278 let chunk2 = StreamChunk::from_pretty(
1279 " i i
1280 + 7 8
1281 - 3 6",
1282 );
1283
1284 let source = MockSource::with_messages(vec![
1286 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1287 Message::Chunk(chunk1),
1288 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1289 Message::Chunk(chunk2),
1290 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1291 ])
1292 .into_executor(schema.clone(), StreamKey::new());
1293
1294 let order_types = vec![OrderType::ascending()];
1295 let column_descs = vec![
1296 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1297 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1298 ];
1299
1300 let table = BatchTable::for_test(
1301 memory_state_store.clone(),
1302 table_id,
1303 column_descs,
1304 order_types,
1305 vec![0],
1306 vec![0, 1],
1307 );
1308
1309 let mut materialize_executor = MaterializeExecutor::for_test(
1310 source,
1311 memory_state_store,
1312 table_id,
1313 vec![ColumnOrder::new(0, OrderType::ascending())],
1314 column_ids,
1315 Arc::new(AtomicU64::new(0)),
1316 ConflictBehavior::NoCheck,
1317 )
1318 .await
1319 .boxed()
1320 .execute();
1321 materialize_executor.next().await.transpose().unwrap();
1322
1323 materialize_executor.next().await.transpose().unwrap();
1324
1325 match materialize_executor.next().await.transpose().unwrap() {
1327 Some(Message::Barrier(_)) => {
1328 let row = table
1329 .get_row(
1330 &OwnedRow::new(vec![Some(3_i32.into())]),
1331 HummockReadEpoch::NoWait(u64::MAX),
1332 )
1333 .await
1334 .unwrap();
1335 assert_eq!(
1336 row,
1337 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1338 );
1339 }
1340 _ => unreachable!(),
1341 }
1342 materialize_executor.next().await.transpose().unwrap();
1343 match materialize_executor.next().await.transpose().unwrap() {
1345 Some(Message::Barrier(_)) => {
1346 let row = table
1347 .get_row(
1348 &OwnedRow::new(vec![Some(7_i32.into())]),
1349 HummockReadEpoch::NoWait(u64::MAX),
1350 )
1351 .await
1352 .unwrap();
1353 assert_eq!(
1354 row,
1355 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1356 );
1357 }
1358 _ => unreachable!(),
1359 }
1360 }
1361
1362 #[tokio::test]
1364 async fn test_upsert_stream() {
1365 let memory_state_store = MemoryStateStore::new();
1367 let table_id = TableId::new(1);
1368 let schema = Schema::new(vec![
1370 Field::unnamed(DataType::Int32),
1371 Field::unnamed(DataType::Int32),
1372 ]);
1373 let column_ids = vec![0.into(), 1.into()];
1374
1375 let chunk1 = StreamChunk::from_pretty(
1377 " i i
1378 + 1 1",
1379 );
1380
1381 let chunk2 = StreamChunk::from_pretty(
1382 " i i
1383 + 1 2
1384 - 1 2",
1385 );
1386
1387 let source = MockSource::with_messages(vec![
1389 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1390 Message::Chunk(chunk1),
1391 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1392 Message::Chunk(chunk2),
1393 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1394 ])
1395 .into_executor(schema.clone(), StreamKey::new());
1396
1397 let order_types = vec![OrderType::ascending()];
1398 let column_descs = vec![
1399 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1400 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1401 ];
1402
1403 let table = BatchTable::for_test(
1404 memory_state_store.clone(),
1405 table_id,
1406 column_descs,
1407 order_types,
1408 vec![0],
1409 vec![0, 1],
1410 );
1411
1412 let mut materialize_executor = MaterializeExecutor::for_test(
1413 source,
1414 memory_state_store,
1415 table_id,
1416 vec![ColumnOrder::new(0, OrderType::ascending())],
1417 column_ids,
1418 Arc::new(AtomicU64::new(0)),
1419 ConflictBehavior::Overwrite,
1420 )
1421 .await
1422 .boxed()
1423 .execute();
1424 materialize_executor.next().await.transpose().unwrap();
1425
1426 materialize_executor.next().await.transpose().unwrap();
1427 materialize_executor.next().await.transpose().unwrap();
1428 materialize_executor.next().await.transpose().unwrap();
1429
1430 match materialize_executor.next().await.transpose().unwrap() {
1431 Some(Message::Barrier(_)) => {
1432 let row = table
1433 .get_row(
1434 &OwnedRow::new(vec![Some(1_i32.into())]),
1435 HummockReadEpoch::NoWait(u64::MAX),
1436 )
1437 .await
1438 .unwrap();
1439 assert!(row.is_none());
1440 }
1441 _ => unreachable!(),
1442 }
1443 }
1444
1445 #[tokio::test]
1446 async fn test_check_insert_conflict() {
1447 let memory_state_store = MemoryStateStore::new();
1449 let table_id = TableId::new(1);
1450 let schema = Schema::new(vec![
1452 Field::unnamed(DataType::Int32),
1453 Field::unnamed(DataType::Int32),
1454 ]);
1455 let column_ids = vec![0.into(), 1.into()];
1456
1457 let chunk1 = StreamChunk::from_pretty(
1459 " i i
1460 + 1 3
1461 + 1 4
1462 + 2 5
1463 + 3 6",
1464 );
1465
1466 let chunk2 = StreamChunk::from_pretty(
1467 " i i
1468 + 1 3
1469 + 2 6",
1470 );
1471
1472 let chunk3 = StreamChunk::from_pretty(
1474 " i i
1475 + 1 4",
1476 );
1477
1478 let source = MockSource::with_messages(vec![
1480 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1481 Message::Chunk(chunk1),
1482 Message::Chunk(chunk2),
1483 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1484 Message::Chunk(chunk3),
1485 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1486 ])
1487 .into_executor(schema.clone(), StreamKey::new());
1488
1489 let order_types = vec![OrderType::ascending()];
1490 let column_descs = vec![
1491 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1492 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1493 ];
1494
1495 let table = BatchTable::for_test(
1496 memory_state_store.clone(),
1497 table_id,
1498 column_descs,
1499 order_types,
1500 vec![0],
1501 vec![0, 1],
1502 );
1503
1504 let mut materialize_executor = MaterializeExecutor::for_test(
1505 source,
1506 memory_state_store,
1507 table_id,
1508 vec![ColumnOrder::new(0, OrderType::ascending())],
1509 column_ids,
1510 Arc::new(AtomicU64::new(0)),
1511 ConflictBehavior::Overwrite,
1512 )
1513 .await
1514 .boxed()
1515 .execute();
1516 materialize_executor.next().await.transpose().unwrap();
1517
1518 materialize_executor.next().await.transpose().unwrap();
1519 materialize_executor.next().await.transpose().unwrap();
1520
1521 match materialize_executor.next().await.transpose().unwrap() {
1523 Some(Message::Barrier(_)) => {
1524 let row = table
1525 .get_row(
1526 &OwnedRow::new(vec![Some(3_i32.into())]),
1527 HummockReadEpoch::NoWait(u64::MAX),
1528 )
1529 .await
1530 .unwrap();
1531 assert_eq!(
1532 row,
1533 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1534 );
1535
1536 let row = table
1537 .get_row(
1538 &OwnedRow::new(vec![Some(1_i32.into())]),
1539 HummockReadEpoch::NoWait(u64::MAX),
1540 )
1541 .await
1542 .unwrap();
1543 assert_eq!(
1544 row,
1545 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1546 );
1547
1548 let row = table
1549 .get_row(
1550 &OwnedRow::new(vec![Some(2_i32.into())]),
1551 HummockReadEpoch::NoWait(u64::MAX),
1552 )
1553 .await
1554 .unwrap();
1555 assert_eq!(
1556 row,
1557 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1558 );
1559 }
1560 _ => unreachable!(),
1561 }
1562 }
1563
1564 #[tokio::test]
1565 async fn test_delete_and_update_conflict() {
1566 let memory_state_store = MemoryStateStore::new();
1568 let table_id = TableId::new(1);
1569 let schema = Schema::new(vec![
1571 Field::unnamed(DataType::Int32),
1572 Field::unnamed(DataType::Int32),
1573 ]);
1574 let column_ids = vec![0.into(), 1.into()];
1575
1576 let chunk1 = StreamChunk::from_pretty(
1578 " i i
1579 + 1 4
1580 + 2 5
1581 + 3 6
1582 U- 8 1
1583 U+ 8 2
1584 + 8 3",
1585 );
1586
1587 let chunk2 = StreamChunk::from_pretty(
1589 " i i
1590 + 7 8
1591 - 3 4
1592 - 5 0",
1593 );
1594
1595 let chunk3 = StreamChunk::from_pretty(
1597 " i i
1598 + 1 5
1599 U- 2 4
1600 U+ 2 8
1601 U- 9 0
1602 U+ 9 1",
1603 );
1604
1605 let source = MockSource::with_messages(vec![
1607 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1608 Message::Chunk(chunk1),
1609 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1610 Message::Chunk(chunk2),
1611 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1612 Message::Chunk(chunk3),
1613 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1614 ])
1615 .into_executor(schema.clone(), StreamKey::new());
1616
1617 let order_types = vec![OrderType::ascending()];
1618 let column_descs = vec![
1619 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1620 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1621 ];
1622
1623 let table = BatchTable::for_test(
1624 memory_state_store.clone(),
1625 table_id,
1626 column_descs,
1627 order_types,
1628 vec![0],
1629 vec![0, 1],
1630 );
1631
1632 let mut materialize_executor = MaterializeExecutor::for_test(
1633 source,
1634 memory_state_store,
1635 table_id,
1636 vec![ColumnOrder::new(0, OrderType::ascending())],
1637 column_ids,
1638 Arc::new(AtomicU64::new(0)),
1639 ConflictBehavior::Overwrite,
1640 )
1641 .await
1642 .boxed()
1643 .execute();
1644 materialize_executor.next().await.transpose().unwrap();
1645
1646 materialize_executor.next().await.transpose().unwrap();
1647
1648 match materialize_executor.next().await.transpose().unwrap() {
1650 Some(Message::Barrier(_)) => {
1651 let row = table
1653 .get_row(
1654 &OwnedRow::new(vec![Some(8_i32.into())]),
1655 HummockReadEpoch::NoWait(u64::MAX),
1656 )
1657 .await
1658 .unwrap();
1659 assert_eq!(
1660 row,
1661 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1662 );
1663 }
1664 _ => unreachable!(),
1665 }
1666 materialize_executor.next().await.transpose().unwrap();
1667
1668 match materialize_executor.next().await.transpose().unwrap() {
1669 Some(Message::Barrier(_)) => {
1670 let row = table
1671 .get_row(
1672 &OwnedRow::new(vec![Some(7_i32.into())]),
1673 HummockReadEpoch::NoWait(u64::MAX),
1674 )
1675 .await
1676 .unwrap();
1677 assert_eq!(
1678 row,
1679 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1680 );
1681
1682 let row = table
1684 .get_row(
1685 &OwnedRow::new(vec![Some(3_i32.into())]),
1686 HummockReadEpoch::NoWait(u64::MAX),
1687 )
1688 .await
1689 .unwrap();
1690 assert_eq!(row, None);
1691
1692 let row = table
1694 .get_row(
1695 &OwnedRow::new(vec![Some(5_i32.into())]),
1696 HummockReadEpoch::NoWait(u64::MAX),
1697 )
1698 .await
1699 .unwrap();
1700 assert_eq!(row, None);
1701 }
1702 _ => unreachable!(),
1703 }
1704
1705 materialize_executor.next().await.transpose().unwrap();
1706 match materialize_executor.next().await.transpose().unwrap() {
1708 Some(Message::Barrier(_)) => {
1709 let row = table
1710 .get_row(
1711 &OwnedRow::new(vec![Some(1_i32.into())]),
1712 HummockReadEpoch::NoWait(u64::MAX),
1713 )
1714 .await
1715 .unwrap();
1716 assert_eq!(
1717 row,
1718 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1719 );
1720
1721 let row = table
1723 .get_row(
1724 &OwnedRow::new(vec![Some(2_i32.into())]),
1725 HummockReadEpoch::NoWait(u64::MAX),
1726 )
1727 .await
1728 .unwrap();
1729 assert_eq!(
1730 row,
1731 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1732 );
1733
1734 let row = table
1736 .get_row(
1737 &OwnedRow::new(vec![Some(9_i32.into())]),
1738 HummockReadEpoch::NoWait(u64::MAX),
1739 )
1740 .await
1741 .unwrap();
1742 assert_eq!(
1743 row,
1744 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1745 );
1746 }
1747 _ => unreachable!(),
1748 }
1749 }
1750
1751 #[tokio::test]
1752 async fn test_change_buffer_into_chunk_with_stream_key() {
1753 let memory_state_store = MemoryStateStore::new();
1756 let table_id = TableId::new(1);
1757 let schema = Schema::new(vec![
1758 Field::unnamed(DataType::Int32),
1759 Field::unnamed(DataType::Int32),
1760 ]);
1761 let column_ids = vec![0.into(), 1.into()];
1762
1763 let chunk1 = StreamChunk::from_pretty(
1764 " i i
1765 + 1 4",
1766 );
1767 let chunk2 = StreamChunk::from_pretty(
1768 " i i
1769 + 1 5",
1770 );
1771
1772 let source = MockSource::with_messages(vec![
1773 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1774 Message::Chunk(chunk1),
1775 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1776 Message::Chunk(chunk2),
1777 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1778 ])
1779 .into_executor(schema.clone(), StreamKey::new());
1780
1781 let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1782 source,
1783 memory_state_store,
1784 table_id,
1785 vec![ColumnOrder::new(0, OrderType::ascending())],
1786 vec![0, 1],
1787 column_ids,
1788 Arc::new(AtomicU64::new(0)),
1789 ConflictBehavior::Overwrite,
1790 )
1791 .await
1792 .boxed()
1793 .execute();
1794
1795 materialize_executor.next().await.transpose().unwrap();
1797 materialize_executor.next().await.transpose().unwrap();
1798
1799 materialize_executor.next().await.transpose().unwrap();
1801
1802 match materialize_executor.next().await.transpose().unwrap() {
1804 Some(Message::Chunk(chunk)) => {
1805 assert_eq!(
1806 chunk.compact_vis(),
1807 StreamChunk::from_pretty(
1808 " i i
1809 - 1 4
1810 + 1 5"
1811 )
1812 );
1813 }
1814 other => panic!("expect chunk, got {other:?}"),
1815 }
1816 }
1817
1818 #[tokio::test]
1819 async fn test_ignore_insert_conflict() {
1820 let memory_state_store = MemoryStateStore::new();
1822 let table_id = TableId::new(1);
1823 let schema = Schema::new(vec![
1825 Field::unnamed(DataType::Int32),
1826 Field::unnamed(DataType::Int32),
1827 ]);
1828 let column_ids = vec![0.into(), 1.into()];
1829
1830 let chunk1 = StreamChunk::from_pretty(
1832 " i i
1833 + 1 3
1834 + 1 4
1835 + 2 5
1836 + 3 6",
1837 );
1838
1839 let chunk2 = StreamChunk::from_pretty(
1840 " i i
1841 + 1 5
1842 + 2 6",
1843 );
1844
1845 let chunk3 = StreamChunk::from_pretty(
1847 " i i
1848 + 1 6",
1849 );
1850
1851 let source = MockSource::with_messages(vec![
1853 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1854 Message::Chunk(chunk1),
1855 Message::Chunk(chunk2),
1856 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1857 Message::Chunk(chunk3),
1858 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1859 ])
1860 .into_executor(schema.clone(), StreamKey::new());
1861
1862 let order_types = vec![OrderType::ascending()];
1863 let column_descs = vec![
1864 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1865 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1866 ];
1867
1868 let table = BatchTable::for_test(
1869 memory_state_store.clone(),
1870 table_id,
1871 column_descs,
1872 order_types,
1873 vec![0],
1874 vec![0, 1],
1875 );
1876
1877 let mut materialize_executor = MaterializeExecutor::for_test(
1878 source,
1879 memory_state_store,
1880 table_id,
1881 vec![ColumnOrder::new(0, OrderType::ascending())],
1882 column_ids,
1883 Arc::new(AtomicU64::new(0)),
1884 ConflictBehavior::IgnoreConflict,
1885 )
1886 .await
1887 .boxed()
1888 .execute();
1889 materialize_executor.next().await.transpose().unwrap();
1890
1891 materialize_executor.next().await.transpose().unwrap();
1892 materialize_executor.next().await.transpose().unwrap();
1893
1894 match materialize_executor.next().await.transpose().unwrap() {
1896 Some(Message::Barrier(_)) => {
1897 let row = table
1898 .get_row(
1899 &OwnedRow::new(vec![Some(3_i32.into())]),
1900 HummockReadEpoch::NoWait(u64::MAX),
1901 )
1902 .await
1903 .unwrap();
1904 assert_eq!(
1905 row,
1906 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1907 );
1908
1909 let row = table
1910 .get_row(
1911 &OwnedRow::new(vec![Some(1_i32.into())]),
1912 HummockReadEpoch::NoWait(u64::MAX),
1913 )
1914 .await
1915 .unwrap();
1916 assert_eq!(
1917 row,
1918 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1919 );
1920
1921 let row = table
1922 .get_row(
1923 &OwnedRow::new(vec![Some(2_i32.into())]),
1924 HummockReadEpoch::NoWait(u64::MAX),
1925 )
1926 .await
1927 .unwrap();
1928 assert_eq!(
1929 row,
1930 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1931 );
1932 }
1933 _ => unreachable!(),
1934 }
1935 }
1936
1937 #[tokio::test]
1938 async fn test_ignore_delete_then_insert() {
1939 let memory_state_store = MemoryStateStore::new();
1941 let table_id = TableId::new(1);
1942 let schema = Schema::new(vec![
1944 Field::unnamed(DataType::Int32),
1945 Field::unnamed(DataType::Int32),
1946 ]);
1947 let column_ids = vec![0.into(), 1.into()];
1948
1949 let chunk1 = StreamChunk::from_pretty(
1951 " i i
1952 + 1 3
1953 - 1 3
1954 + 1 6",
1955 );
1956
1957 let source = MockSource::with_messages(vec![
1959 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1960 Message::Chunk(chunk1),
1961 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1962 ])
1963 .into_executor(schema.clone(), StreamKey::new());
1964
1965 let order_types = vec![OrderType::ascending()];
1966 let column_descs = vec![
1967 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1968 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1969 ];
1970
1971 let table = BatchTable::for_test(
1972 memory_state_store.clone(),
1973 table_id,
1974 column_descs,
1975 order_types,
1976 vec![0],
1977 vec![0, 1],
1978 );
1979
1980 let mut materialize_executor = MaterializeExecutor::for_test(
1981 source,
1982 memory_state_store,
1983 table_id,
1984 vec![ColumnOrder::new(0, OrderType::ascending())],
1985 column_ids,
1986 Arc::new(AtomicU64::new(0)),
1987 ConflictBehavior::IgnoreConflict,
1988 )
1989 .await
1990 .boxed()
1991 .execute();
1992 let _msg1 = materialize_executor
1993 .next()
1994 .await
1995 .transpose()
1996 .unwrap()
1997 .unwrap()
1998 .as_barrier()
1999 .unwrap();
2000 let _msg2 = materialize_executor
2001 .next()
2002 .await
2003 .transpose()
2004 .unwrap()
2005 .unwrap()
2006 .as_chunk()
2007 .unwrap();
2008 let _msg3 = materialize_executor
2009 .next()
2010 .await
2011 .transpose()
2012 .unwrap()
2013 .unwrap()
2014 .as_barrier()
2015 .unwrap();
2016
2017 let row = table
2018 .get_row(
2019 &OwnedRow::new(vec![Some(1_i32.into())]),
2020 HummockReadEpoch::NoWait(u64::MAX),
2021 )
2022 .await
2023 .unwrap();
2024 assert_eq!(
2025 row,
2026 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2027 );
2028 }
2029
2030 #[tokio::test]
2031 async fn test_ignore_delete_and_update_conflict() {
2032 let memory_state_store = MemoryStateStore::new();
2034 let table_id = TableId::new(1);
2035 let schema = Schema::new(vec![
2037 Field::unnamed(DataType::Int32),
2038 Field::unnamed(DataType::Int32),
2039 ]);
2040 let column_ids = vec![0.into(), 1.into()];
2041
2042 let chunk1 = StreamChunk::from_pretty(
2044 " i i
2045 + 1 4
2046 + 2 5
2047 + 3 6
2048 U- 8 1
2049 U+ 8 2
2050 + 8 3",
2051 );
2052
2053 let chunk2 = StreamChunk::from_pretty(
2055 " i i
2056 + 7 8
2057 - 3 4
2058 - 5 0",
2059 );
2060
2061 let chunk3 = StreamChunk::from_pretty(
2063 " i i
2064 + 1 5
2065 U- 2 4
2066 U+ 2 8
2067 U- 9 0
2068 U+ 9 1",
2069 );
2070
2071 let source = MockSource::with_messages(vec![
2073 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2074 Message::Chunk(chunk1),
2075 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2076 Message::Chunk(chunk2),
2077 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2078 Message::Chunk(chunk3),
2079 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2080 ])
2081 .into_executor(schema.clone(), StreamKey::new());
2082
2083 let order_types = vec![OrderType::ascending()];
2084 let column_descs = vec![
2085 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2086 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2087 ];
2088
2089 let table = BatchTable::for_test(
2090 memory_state_store.clone(),
2091 table_id,
2092 column_descs,
2093 order_types,
2094 vec![0],
2095 vec![0, 1],
2096 );
2097
2098 let mut materialize_executor = MaterializeExecutor::for_test(
2099 source,
2100 memory_state_store,
2101 table_id,
2102 vec![ColumnOrder::new(0, OrderType::ascending())],
2103 column_ids,
2104 Arc::new(AtomicU64::new(0)),
2105 ConflictBehavior::IgnoreConflict,
2106 )
2107 .await
2108 .boxed()
2109 .execute();
2110 materialize_executor.next().await.transpose().unwrap();
2111
2112 materialize_executor.next().await.transpose().unwrap();
2113
2114 match materialize_executor.next().await.transpose().unwrap() {
2116 Some(Message::Barrier(_)) => {
2117 let row = table
2119 .get_row(
2120 &OwnedRow::new(vec![Some(8_i32.into())]),
2121 HummockReadEpoch::NoWait(u64::MAX),
2122 )
2123 .await
2124 .unwrap();
2125 assert_eq!(
2126 row,
2127 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2128 );
2129 }
2130 _ => unreachable!(),
2131 }
2132 materialize_executor.next().await.transpose().unwrap();
2133
2134 match materialize_executor.next().await.transpose().unwrap() {
2135 Some(Message::Barrier(_)) => {
2136 let row = table
2137 .get_row(
2138 &OwnedRow::new(vec![Some(7_i32.into())]),
2139 HummockReadEpoch::NoWait(u64::MAX),
2140 )
2141 .await
2142 .unwrap();
2143 assert_eq!(
2144 row,
2145 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2146 );
2147
2148 let row = table
2150 .get_row(
2151 &OwnedRow::new(vec![Some(3_i32.into())]),
2152 HummockReadEpoch::NoWait(u64::MAX),
2153 )
2154 .await
2155 .unwrap();
2156 assert_eq!(row, None);
2157
2158 let row = table
2160 .get_row(
2161 &OwnedRow::new(vec![Some(5_i32.into())]),
2162 HummockReadEpoch::NoWait(u64::MAX),
2163 )
2164 .await
2165 .unwrap();
2166 assert_eq!(row, None);
2167 }
2168 _ => unreachable!(),
2169 }
2170
2171 materialize_executor.next().await.transpose().unwrap();
2172 match materialize_executor.next().await.transpose().unwrap() {
2175 Some(Message::Barrier(_)) => {
2176 let row = table
2177 .get_row(
2178 &OwnedRow::new(vec![Some(1_i32.into())]),
2179 HummockReadEpoch::NoWait(u64::MAX),
2180 )
2181 .await
2182 .unwrap();
2183 assert_eq!(
2184 row,
2185 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2186 );
2187
2188 let row = table
2190 .get_row(
2191 &OwnedRow::new(vec![Some(2_i32.into())]),
2192 HummockReadEpoch::NoWait(u64::MAX),
2193 )
2194 .await
2195 .unwrap();
2196 assert_eq!(
2197 row,
2198 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2199 );
2200
2201 let row = table
2203 .get_row(
2204 &OwnedRow::new(vec![Some(9_i32.into())]),
2205 HummockReadEpoch::NoWait(u64::MAX),
2206 )
2207 .await
2208 .unwrap();
2209 assert_eq!(
2210 row,
2211 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2212 );
2213 }
2214 _ => unreachable!(),
2215 }
2216 }
2217
2218 #[tokio::test]
2219 async fn test_do_update_if_not_null_conflict() {
2220 let memory_state_store = MemoryStateStore::new();
2222 let table_id = TableId::new(1);
2223 let schema = Schema::new(vec![
2225 Field::unnamed(DataType::Int32),
2226 Field::unnamed(DataType::Int32),
2227 ]);
2228 let column_ids = vec![0.into(), 1.into()];
2229
2230 let chunk1 = StreamChunk::from_pretty(
2232 " i i
2233 + 1 4
2234 + 2 .
2235 + 3 6
2236 U- 8 .
2237 U+ 8 2
2238 + 8 .",
2239 );
2240
2241 let chunk2 = StreamChunk::from_pretty(
2243 " i i
2244 + 7 8
2245 - 3 4
2246 - 5 0",
2247 );
2248
2249 let chunk3 = StreamChunk::from_pretty(
2251 " i i
2252 + 1 5
2253 + 7 .
2254 U- 2 4
2255 U+ 2 .
2256 U- 9 0
2257 U+ 9 1",
2258 );
2259
2260 let source = MockSource::with_messages(vec![
2262 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2263 Message::Chunk(chunk1),
2264 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2265 Message::Chunk(chunk2),
2266 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2267 Message::Chunk(chunk3),
2268 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2269 ])
2270 .into_executor(schema.clone(), StreamKey::new());
2271
2272 let order_types = vec![OrderType::ascending()];
2273 let column_descs = vec![
2274 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2275 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2276 ];
2277
2278 let table = BatchTable::for_test(
2279 memory_state_store.clone(),
2280 table_id,
2281 column_descs,
2282 order_types,
2283 vec![0],
2284 vec![0, 1],
2285 );
2286
2287 let mut materialize_executor = MaterializeExecutor::for_test(
2288 source,
2289 memory_state_store,
2290 table_id,
2291 vec![ColumnOrder::new(0, OrderType::ascending())],
2292 column_ids,
2293 Arc::new(AtomicU64::new(0)),
2294 ConflictBehavior::DoUpdateIfNotNull,
2295 )
2296 .await
2297 .boxed()
2298 .execute();
2299 materialize_executor.next().await.transpose().unwrap();
2300
2301 materialize_executor.next().await.transpose().unwrap();
2302
2303 match materialize_executor.next().await.transpose().unwrap() {
2305 Some(Message::Barrier(_)) => {
2306 let row = table
2307 .get_row(
2308 &OwnedRow::new(vec![Some(8_i32.into())]),
2309 HummockReadEpoch::NoWait(u64::MAX),
2310 )
2311 .await
2312 .unwrap();
2313 assert_eq!(
2314 row,
2315 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2316 );
2317
2318 let row = table
2319 .get_row(
2320 &OwnedRow::new(vec![Some(2_i32.into())]),
2321 HummockReadEpoch::NoWait(u64::MAX),
2322 )
2323 .await
2324 .unwrap();
2325 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2326 }
2327 _ => unreachable!(),
2328 }
2329 materialize_executor.next().await.transpose().unwrap();
2330
2331 match materialize_executor.next().await.transpose().unwrap() {
2332 Some(Message::Barrier(_)) => {
2333 let row = table
2334 .get_row(
2335 &OwnedRow::new(vec![Some(7_i32.into())]),
2336 HummockReadEpoch::NoWait(u64::MAX),
2337 )
2338 .await
2339 .unwrap();
2340 assert_eq!(
2341 row,
2342 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2343 );
2344
2345 let row = table
2347 .get_row(
2348 &OwnedRow::new(vec![Some(3_i32.into())]),
2349 HummockReadEpoch::NoWait(u64::MAX),
2350 )
2351 .await
2352 .unwrap();
2353 assert_eq!(row, None);
2354
2355 let row = table
2357 .get_row(
2358 &OwnedRow::new(vec![Some(5_i32.into())]),
2359 HummockReadEpoch::NoWait(u64::MAX),
2360 )
2361 .await
2362 .unwrap();
2363 assert_eq!(row, None);
2364 }
2365 _ => unreachable!(),
2366 }
2367
2368 materialize_executor.next().await.transpose().unwrap();
2369 match materialize_executor.next().await.transpose().unwrap() {
2372 Some(Message::Barrier(_)) => {
2373 let row = table
2374 .get_row(
2375 &OwnedRow::new(vec![Some(7_i32.into())]),
2376 HummockReadEpoch::NoWait(u64::MAX),
2377 )
2378 .await
2379 .unwrap();
2380 assert_eq!(
2381 row,
2382 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2383 );
2384
2385 let row = table
2387 .get_row(
2388 &OwnedRow::new(vec![Some(2_i32.into())]),
2389 HummockReadEpoch::NoWait(u64::MAX),
2390 )
2391 .await
2392 .unwrap();
2393 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2394
2395 let row = table
2397 .get_row(
2398 &OwnedRow::new(vec![Some(9_i32.into())]),
2399 HummockReadEpoch::NoWait(u64::MAX),
2400 )
2401 .await
2402 .unwrap();
2403 assert_eq!(
2404 row,
2405 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2406 );
2407 }
2408 _ => unreachable!(),
2409 }
2410 }
2411
2412 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2413 const KN: u32 = 4;
2414 const SEED: u64 = 998244353;
2415 let mut ret = vec![];
2416 let mut builder =
2417 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2418 let mut rng = SmallRng::seed_from_u64(SEED);
2419
2420 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2421 let len = c.data_chunk().capacity();
2422 let mut c = StreamChunkMut::from(c);
2423 for i in 0..len {
2424 c.set_vis(i, rng.random_bool(0.5));
2425 }
2426 c.into()
2427 };
2428 for _ in 0..row_number {
2429 let k = (rng.next_u32() % KN) as i32;
2430 let v = rng.next_u32() as i32;
2431 let op = if rng.random_bool(0.5) {
2432 Op::Insert
2433 } else {
2434 Op::Delete
2435 };
2436 if let Some(c) =
2437 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2438 {
2439 ret.push(random_vis(c, &mut rng));
2440 }
2441 }
2442 if let Some(c) = builder.take() {
2443 ret.push(random_vis(c, &mut rng));
2444 }
2445 ret
2446 }
2447
2448 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2449 const N: usize = 100000;
2450
2451 let memory_state_store = MemoryStateStore::new();
2453 let table_id = TableId::new(1);
2454 let schema = Schema::new(vec![
2456 Field::unnamed(DataType::Int32),
2457 Field::unnamed(DataType::Int32),
2458 ]);
2459 let column_ids = vec![0.into(), 1.into()];
2460
2461 let chunks = gen_fuzz_data(N, 128);
2462 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2463 .chain(chunks.into_iter().map(Message::Chunk))
2464 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2465 test_epoch(2),
2466 ))))
2467 .collect();
2468 let source =
2470 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2471
2472 let mut materialize_executor = MaterializeExecutor::for_test(
2473 source,
2474 memory_state_store.clone(),
2475 table_id,
2476 vec![ColumnOrder::new(0, OrderType::ascending())],
2477 column_ids,
2478 Arc::new(AtomicU64::new(0)),
2479 conflict_behavior,
2480 )
2481 .await
2482 .boxed()
2483 .execute();
2484 materialize_executor.expect_barrier().await;
2485
2486 let order_types = vec![OrderType::ascending()];
2487 let column_descs = vec![
2488 ColumnDesc::unnamed(0.into(), DataType::Int32),
2489 ColumnDesc::unnamed(1.into(), DataType::Int32),
2490 ];
2491 let pk_indices = vec![0];
2492
2493 let mut table = StateTable::from_table_catalog(
2494 &crate::common::table::test_utils::gen_pbtable(
2495 TableId::from(1002),
2496 column_descs.clone(),
2497 order_types,
2498 pk_indices,
2499 0,
2500 ),
2501 memory_state_store.clone(),
2502 None,
2503 )
2504 .await;
2505
2506 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2507 table.write_chunk(c);
2509 }
2510 }
2511
2512 #[tokio::test]
2513 async fn fuzz_test_stream_consistent_upsert() {
2514 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2515 }
2516
2517 #[tokio::test]
2518 async fn fuzz_test_stream_consistent_ignore() {
2519 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2520 }
2521}