1use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57 NormalIngestion,
58 MergingData,
59 CleanUp,
60 CommitAndYieldBarrier {
61 barrier: BarrierInner<M>,
62 expect_next_state: Box<MaterializeStreamState<M>>,
63 },
64 RefreshEnd {
65 on_complete_epoch: EpochPair,
66 },
67}
68
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71 input: Executor,
72
73 schema: Schema,
74
75 state_table: StateTableInner<S, SD>,
76
77 stream_key_indices: Vec<usize>,
88
89 arrange_key_indices: Vec<usize>,
91
92 actor_context: ActorContextRef,
93
94 materialize_cache: Option<MaterializeCache>,
96
97 conflict_behavior: ConflictBehavior,
98
99 cleaned_by_ttl_watermark: bool,
101
102 version_column_indices: Vec<u32>,
103
104 may_have_downstream: bool,
105
106 subscriber_ids: HashSet<SubscriberId>,
107
108 metrics: MaterializeMetrics,
109
110 is_dummy_table: bool,
113
114 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
116
117 local_barrier_manager: LocalBarrierManager,
119}
120
121pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
123 pub table_catalog: Table,
125
126 pub staging_table_catalog: Table,
128
129 pub is_refreshing: bool,
131
132 pub staging_table: StateTableInner<S, SD>,
139
140 pub progress_table: RefreshProgressTable<S>,
142
143 pub table_id: TableId,
145}
146
147impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
148 pub async fn new(
150 store: S,
151 table_catalog: &Table,
152 staging_table_catalog: &Table,
153 progress_state_table: &Table,
154 vnodes: Option<Arc<Bitmap>>,
155 ) -> Self {
156 let table_id = table_catalog.id;
157
158 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
160 staging_table_catalog,
161 store.clone(),
162 vnodes.clone(),
163 )
164 .await;
165
166 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
167 progress_state_table,
168 store,
169 vnodes,
170 )
171 .await;
172
173 let pk_len = table_catalog.pk.len();
175 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
176
177 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
178
179 Self {
180 table_catalog: table_catalog.clone(),
181 staging_table_catalog: staging_table_catalog.clone(),
182 is_refreshing: false,
183 staging_table,
184 progress_table,
185 table_id,
186 }
187 }
188}
189
190fn get_op_consistency_level(
191 cleaned_by_ttl_watermark: bool,
192 conflict_behavior: ConflictBehavior,
193 may_have_downstream: bool,
194 subscriber_ids: &HashSet<SubscriberId>,
195) -> StateTableOpConsistencyLevel {
196 if cleaned_by_ttl_watermark {
197 StateTableOpConsistencyLevel::Inconsistent
201 } else if !subscriber_ids.is_empty() {
202 StateTableOpConsistencyLevel::LogStoreEnabled
203 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
204 StateTableOpConsistencyLevel::Inconsistent
207 } else {
208 StateTableOpConsistencyLevel::ConsistentOldValue
209 }
210}
211
212impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
213 #[allow(clippy::too_many_arguments)]
217 pub async fn new(
218 input: Executor,
219 schema: Schema,
220 store: S,
221 arrange_key: Vec<ColumnOrder>,
222 actor_context: ActorContextRef,
223 vnodes: Option<Arc<Bitmap>>,
224 table_catalog: &Table,
225 watermark_epoch: AtomicU64Ref,
226 conflict_behavior: ConflictBehavior,
227 version_column_indices: Vec<u32>,
228 metrics: Arc<StreamingMetrics>,
229 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
230 cleaned_by_ttl_watermark: bool,
231 local_barrier_manager: LocalBarrierManager,
232 ) -> Self {
233 let table_columns: Vec<ColumnDesc> = table_catalog
234 .columns
235 .iter()
236 .map(|col| col.column_desc.as_ref().unwrap().into())
237 .collect();
238
239 let toastable_column_indices = if table_catalog.cdc_table_type()
242 == risingwave_pb::catalog::table::CdcTableType::Postgres
243 {
244 let toastable_indices: Vec<usize> = table_columns
245 .iter()
246 .enumerate()
247 .filter_map(|(index, column)| match &column.data_type {
248 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
257 Some(index)
258 }
259 _ => None,
260 })
261 .collect();
262
263 if toastable_indices.is_empty() {
264 None
265 } else {
266 Some(toastable_indices)
267 }
268 } else {
269 None
270 };
271
272 let row_serde: BasicSerde = BasicSerde::new(
273 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
274 Arc::from(table_columns.into_boxed_slice()),
275 );
276
277 let stream_key_indices: Vec<usize> = table_catalog
278 .stream_key
279 .iter()
280 .map(|idx| *idx as usize)
281 .collect();
282 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
283 let may_have_downstream = actor_context.initial_dispatch_num != 0;
284 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
285 let op_consistency_level = get_op_consistency_level(
286 cleaned_by_ttl_watermark,
287 conflict_behavior,
288 may_have_downstream,
289 &subscriber_ids,
290 );
291 let state_table_metrics = metrics.new_state_table_metrics(
292 table_catalog.id,
293 actor_context.id,
294 actor_context.fragment_id,
295 );
296 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
298 .with_op_consistency_level(op_consistency_level)
299 .enable_preload_all_rows_by_config(&actor_context.config)
300 .enable_vnode_key_pruning(true)
301 .with_metrics(state_table_metrics)
302 .build()
303 .await;
304
305 let mv_metrics = metrics.new_materialize_metrics(
306 table_catalog.id,
307 actor_context.id,
308 actor_context.fragment_id,
309 );
310 let cache_metrics = metrics.new_materialize_cache_metrics(
311 table_catalog.id,
312 actor_context.id,
313 actor_context.fragment_id,
314 );
315
316 let metrics_info =
317 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
318
319 let is_dummy_table =
320 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
321
322 Self {
323 input,
324 schema,
325 state_table,
326 stream_key_indices,
327 arrange_key_indices,
328 actor_context,
329 materialize_cache: MaterializeCache::new(
330 watermark_epoch,
331 metrics_info,
332 row_serde,
333 version_column_indices.clone(),
334 conflict_behavior,
335 toastable_column_indices,
336 cache_metrics,
337 ),
338 conflict_behavior,
339 cleaned_by_ttl_watermark,
340 version_column_indices,
341 is_dummy_table,
342 may_have_downstream,
343 subscriber_ids,
344 metrics: mv_metrics,
345 refresh_args,
346 local_barrier_manager,
347 }
348 }
349
350 #[try_stream(ok = Message, error = StreamExecutorError)]
351 async fn execute_inner(mut self) {
352 let mv_table_id = self.state_table.table_id();
353 let data_types = self.schema.data_types();
354 let mut input = self.input.execute();
355
356 let barrier = expect_first_barrier(&mut input).await?;
357 let first_epoch = barrier.epoch;
358 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
361 self.state_table.init_epoch(first_epoch).await?;
362
363 let mut inner_state =
365 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
366 if let Some(ref mut refresh_args) = self.refresh_args {
368 refresh_args.staging_table.init_epoch(first_epoch).await?;
369
370 refresh_args.progress_table.recover(first_epoch).await?;
372
373 let progress_stats = refresh_args.progress_table.get_progress_stats();
375 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
376 refresh_args.is_refreshing = true;
377 tracing::info!(
378 total_vnodes = progress_stats.total_vnodes,
379 completed_vnodes = progress_stats.completed_vnodes,
380 "Recovered refresh in progress, resuming refresh operation"
381 );
382
383 let incomplete_vnodes: Vec<_> = refresh_args
387 .progress_table
388 .get_all_progress()
389 .iter()
390 .filter(|(_, entry)| !entry.is_completed)
391 .map(|(&vnode, _)| vnode)
392 .collect();
393
394 if !incomplete_vnodes.is_empty() {
395 tracing::info!(
397 incomplete_vnodes = incomplete_vnodes.len(),
398 "Recovery detected incomplete VNodes, resuming refresh operation"
399 );
400 } else {
403 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
405 }
406 }
407 }
408
409 if let Some(ref refresh_args) = self.refresh_args
411 && refresh_args.is_refreshing
412 {
413 let incomplete_vnodes: Vec<_> = refresh_args
415 .progress_table
416 .get_all_progress()
417 .iter()
418 .filter(|(_, entry)| !entry.is_completed)
419 .map(|(&vnode, _)| vnode)
420 .collect();
421 if !incomplete_vnodes.is_empty() {
422 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
424 tracing::info!(
425 incomplete_vnodes = incomplete_vnodes.len(),
426 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
427 );
428 }
429 }
430
431 'main_loop: loop {
433 match *inner_state {
434 MaterializeStreamState::NormalIngestion => {
435 #[for_await]
436 '_normal_ingest: for msg in input.by_ref() {
437 let msg = msg?;
438 if let Some(cache) = &mut self.materialize_cache {
439 cache.evict();
440 }
441
442 match msg {
443 Message::Watermark(w) => {
444 if self.cleaned_by_ttl_watermark
445 && self.state_table.clean_watermark_index == Some(w.col_idx)
446 {
447 self.state_table.update_watermark(w.val.clone());
448 }
449 yield Message::Watermark(w);
450 }
451 Message::Chunk(chunk) if self.is_dummy_table => {
452 self.metrics
453 .materialize_input_row_count
454 .inc_by(chunk.cardinality() as u64);
455 yield Message::Chunk(chunk);
456 }
457 Message::Chunk(chunk) => {
458 self.metrics
459 .materialize_input_row_count
460 .inc_by(chunk.cardinality() as u64);
461
462 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
469 self.conflict_behavior
470 && !self.state_table.is_consistent_op()
471 && !self.cleaned_by_ttl_watermark
472 && self.version_column_indices.is_empty()
473 {
474 ConflictBehavior::NoCheck
475 } else {
476 self.conflict_behavior
477 };
478
479 match optimized_conflict_behavior {
480 checked_conflict_behaviors!() => {
481 if chunk.cardinality() == 0 {
482 continue;
484 }
485
486 if let Some(ref mut refresh_args) = self.refresh_args
489 && refresh_args.is_refreshing
490 {
491 let key_chunk = chunk
492 .clone()
493 .project(self.state_table.pk_indices());
494 tracing::trace!(
495 staging_chunk = %key_chunk.to_pretty(),
496 input_chunk = %chunk.to_pretty(),
497 "writing to staging table"
498 );
499 if cfg!(debug_assertions) {
500 assert!(
502 key_chunk
503 .ops()
504 .iter()
505 .all(|op| op == &Op::Insert)
506 );
507 }
508 refresh_args
509 .staging_table
510 .write_chunk(key_chunk.clone());
511 refresh_args.staging_table.try_flush().await?;
512 }
513
514 let cache = self.materialize_cache.as_mut().unwrap();
515 let change_buffer =
516 cache.handle_new(chunk, &self.state_table).await?;
517
518 let output_chunk = if self.stream_key_indices
519 == self.state_table.pk_indices()
520 {
521 change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
522 data_types.clone(),
523 )
524 } else {
525 debug_assert!(
528 self.state_table
529 .pk_indices()
530 .iter()
531 .all(|&i| self.stream_key_indices.contains(&i))
532 );
533 change_buffer.into_chunk_with_key(
534 data_types.clone(),
535 &self.stream_key_indices,
536 )
537 };
538
539 match output_chunk {
540 Some(output_chunk) => {
541 self.state_table.write_chunk(output_chunk.clone());
542 self.state_table.try_flush().await?;
543 yield Message::Chunk(output_chunk);
544 }
545 None => continue,
546 }
547 }
548 ConflictBehavior::NoCheck => {
549 self.state_table.write_chunk(chunk.clone());
550 self.state_table.try_flush().await?;
551
552 if let Some(ref mut refresh_args) = self.refresh_args
554 && refresh_args.is_refreshing
555 {
556 let key_chunk = chunk
557 .clone()
558 .project(self.state_table.pk_indices());
559 tracing::trace!(
560 staging_chunk = %key_chunk.to_pretty(),
561 input_chunk = %chunk.to_pretty(),
562 "writing to staging table"
563 );
564 if cfg!(debug_assertions) {
565 assert!(
567 key_chunk
568 .ops()
569 .iter()
570 .all(|op| op == &Op::Insert)
571 );
572 }
573 refresh_args
574 .staging_table
575 .write_chunk(key_chunk.clone());
576 refresh_args.staging_table.try_flush().await?;
577 }
578
579 yield Message::Chunk(chunk);
580 }
581 }
582 }
583 Message::Barrier(barrier) => {
584 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
585 barrier,
586 expect_next_state: Box::new(
587 MaterializeStreamState::NormalIngestion,
588 ),
589 };
590 continue 'main_loop;
591 }
592 }
593 }
594
595 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
596 anyhow::anyhow!(
597 "Input stream terminated unexpectedly during normal ingestion"
598 ),
599 )));
600 }
601 MaterializeStreamState::MergingData => {
602 let Some(refresh_args) = self.refresh_args.as_mut() else {
603 panic!(
604 "MaterializeExecutor entered CleanUp state without refresh_args configured"
605 );
606 };
607 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
608
609 debug_assert_eq!(
610 self.state_table.vnodes(),
611 refresh_args.staging_table.vnodes()
612 );
613 debug_assert_eq!(
614 refresh_args.staging_table.vnodes(),
615 refresh_args.progress_table.vnodes()
616 );
617
618 let mut rows_to_delete = vec![];
619 let mut merge_complete = false;
620 let mut pending_barrier: Option<Barrier> = None;
621
622 {
624 let left_input = input.by_ref().map(Either::Left);
625 let right_merge_sort = pin!(
626 Self::make_mergesort_stream(
627 &self.state_table,
628 &refresh_args.staging_table,
629 &mut refresh_args.progress_table
630 )
631 .map(Either::Right)
632 );
633
634 let mut merge_stream =
637 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
638 stream::PollNext::Left
639 });
640
641 #[for_await]
642 'merge_stream: for either in &mut merge_stream {
643 match either {
644 Either::Left(msg) => {
645 let msg = msg?;
646 match msg {
647 Message::Watermark(w) => yield Message::Watermark(w),
648 Message::Chunk(chunk) => {
649 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
650 }
651 Message::Barrier(b) => {
652 pending_barrier = Some(b);
653 break 'merge_stream;
654 }
655 }
656 }
657 Either::Right(result) => {
658 match result? {
659 Some((_vnode, row)) => {
660 rows_to_delete.push(row);
661 }
662 None => {
663 merge_complete = true;
665
666 }
668 }
669 }
670 }
671 }
672 }
673
674 for row in &rows_to_delete {
676 self.state_table.delete(row);
677 }
678 if !rows_to_delete.is_empty() {
679 let to_delete_chunk = StreamChunk::from_rows(
680 &rows_to_delete
681 .iter()
682 .map(|row| (Op::Delete, row))
683 .collect_vec(),
684 &self.schema.data_types(),
685 );
686
687 yield Message::Chunk(to_delete_chunk);
688 }
689
690 assert!(pending_barrier.is_some(), "pending barrier is not set");
692
693 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694 barrier: pending_barrier.unwrap(),
695 expect_next_state: if merge_complete {
696 Box::new(MaterializeStreamState::CleanUp)
697 } else {
698 Box::new(MaterializeStreamState::MergingData)
699 },
700 };
701 continue 'main_loop;
702 }
703 MaterializeStreamState::CleanUp => {
704 let Some(refresh_args) = self.refresh_args.as_mut() else {
705 panic!(
706 "MaterializeExecutor entered MergingData state without refresh_args configured"
707 );
708 };
709 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
710
711 #[for_await]
712 for msg in input.by_ref() {
713 let msg = msg?;
714 match msg {
715 Message::Watermark(w) => {
716 if self.cleaned_by_ttl_watermark
717 && self.state_table.clean_watermark_index == Some(w.col_idx)
718 {
719 self.state_table.update_watermark(w.val.clone());
720 }
721 yield Message::Watermark(w)
722 }
723 Message::Chunk(chunk) => {
724 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
725 }
726 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
727 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
728 barrier,
729 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
730 };
731 continue 'main_loop;
732 }
733 Message::Barrier(barrier) => {
734 let staging_table_id = refresh_args.staging_table.table_id();
735 let epoch = barrier.epoch;
736 self.local_barrier_manager.report_refresh_finished(
737 epoch,
738 self.actor_context.id,
739 refresh_args.table_id,
740 staging_table_id,
741 );
742 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
743
744 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
745 barrier,
746 expect_next_state: Box::new(
747 MaterializeStreamState::RefreshEnd {
748 on_complete_epoch: epoch,
749 },
750 ),
751 };
752 continue 'main_loop;
753 }
754 }
755 }
756 }
757 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
758 let Some(refresh_args) = self.refresh_args.as_mut() else {
759 panic!(
760 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
761 );
762 };
763 let staging_table_id = refresh_args.staging_table.table_id();
764
765 let staging_store = refresh_args.staging_table.state_store().clone();
767 staging_store
768 .try_wait_epoch(
769 HummockReadEpoch::Committed(on_complete_epoch.prev),
770 TryWaitEpochOptions {
771 table_id: staging_table_id,
772 },
773 )
774 .await?;
775
776 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
777
778 if let Some(ref mut refresh_args) = self.refresh_args {
779 refresh_args.is_refreshing = false;
780 }
781 *inner_state = MaterializeStreamState::NormalIngestion;
782 continue 'main_loop;
783 }
784 MaterializeStreamState::CommitAndYieldBarrier {
785 barrier,
786 mut expect_next_state,
787 } => {
788 if let Some(ref mut refresh_args) = self.refresh_args {
789 match barrier.mutation.as_deref() {
790 Some(Mutation::RefreshStart {
791 table_id: refresh_table_id,
792 associated_source_id: _,
793 }) if *refresh_table_id == refresh_args.table_id => {
794 debug_assert!(
795 !refresh_args.is_refreshing,
796 "cannot start refresh twice"
797 );
798 refresh_args.is_refreshing = true;
799 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
800
801 Self::init_refresh_progress(
803 &self.state_table,
804 &mut refresh_args.progress_table,
805 barrier.epoch.curr,
806 )?;
807 }
808 Some(Mutation::LoadFinish {
809 associated_source_id: load_finish_source_id,
810 }) => {
811 let associated_source_id: SourceId = match refresh_args
813 .table_catalog
814 .optional_associated_source_id
815 {
816 Some(id) => id.into(),
817 None => unreachable!("associated_source_id is not set"),
818 };
819
820 if *load_finish_source_id == associated_source_id {
821 tracing::info!(
822 %load_finish_source_id,
823 "LoadFinish received, starting data replacement"
824 );
825 expect_next_state =
826 Box::new(MaterializeStreamState::<_>::MergingData);
827 }
828 }
829 _ => {}
830 }
831 }
832
833 if !self.may_have_downstream
837 && barrier.has_more_downstream_fragments(self.actor_context.id)
838 {
839 self.may_have_downstream = true;
840 }
841 Self::may_update_depended_subscriptions(
842 &mut self.subscriber_ids,
843 &barrier,
844 mv_table_id,
845 );
846 let op_consistency_level = get_op_consistency_level(
847 self.cleaned_by_ttl_watermark,
848 self.conflict_behavior,
849 self.may_have_downstream,
850 &self.subscriber_ids,
851 );
852 let post_commit = self
853 .state_table
854 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
855 .await?;
856
857 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
858
859 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
861 {
862 Some((
865 refresh_args.staging_table.commit(barrier.epoch).await?,
866 refresh_args.progress_table.commit(barrier.epoch).await?,
867 ))
868 } else {
869 None
870 };
871
872 let b_epoch = barrier.epoch;
873 yield Message::Barrier(barrier);
874
875 if let Some((_, cache_may_stale)) = post_commit
877 .post_yield_barrier(update_vnode_bitmap.clone())
878 .await?
879 && cache_may_stale
880 && let Some(cache) = &mut self.materialize_cache
881 {
882 cache.clear();
883 }
884
885 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
887 staging_post_commit
888 .post_yield_barrier(update_vnode_bitmap.clone())
889 .await?;
890 progress_post_commit
891 .post_yield_barrier(update_vnode_bitmap)
892 .await?;
893 }
894
895 self.metrics
896 .materialize_current_epoch
897 .set(b_epoch.curr as i64);
898
899 *inner_state = *expect_next_state;
902 }
903 }
904 }
905 }
906
907 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
911 async fn make_mergesort_stream<'a>(
912 main_table: &'a StateTableInner<S, SD>,
913 staging_table: &'a StateTableInner<S, SD>,
914 progress_table: &'a mut RefreshProgressTable<S>,
915 ) {
916 for vnode in main_table.vnodes().clone().iter_vnodes() {
917 let mut processed_rows = 0;
918 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
920 if let Some(current_entry) = progress_table.get_progress(vnode) {
921 if current_entry.is_completed {
923 tracing::debug!(
924 vnode = vnode.to_index(),
925 "Skipping already completed VNode during recovery"
926 );
927 continue;
928 }
929 processed_rows += current_entry.processed_rows;
930 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
931
932 if let Some(current_state) = ¤t_entry.current_pos {
933 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
934 } else {
935 (Bound::Unbounded, Bound::Unbounded)
936 }
937 } else {
938 (Bound::Unbounded, Bound::Unbounded)
939 };
940
941 let iter_main = main_table
942 .iter_keyed_row_with_vnode(
943 vnode,
944 &pk_range,
945 PrefetchOptions::prefetch_for_large_range_scan(),
946 )
947 .await?;
948 let iter_staging = staging_table
949 .iter_keyed_row_with_vnode(
950 vnode,
951 &pk_range,
952 PrefetchOptions::prefetch_for_large_range_scan(),
953 )
954 .await?;
955
956 pin_mut!(iter_main);
957 pin_mut!(iter_staging);
958
959 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
961 let mut staging_item: Option<KeyedRow<Bytes>> =
962 iter_staging.next().await.transpose()?;
963
964 while let Some(main_kv) = main_item {
965 let main_key = main_kv.key();
966
967 let mut should_delete = false;
969 while let Some(staging_kv) = &staging_item {
970 let staging_key = staging_kv.key();
971 match main_key.cmp(staging_key) {
972 std::cmp::Ordering::Greater => {
973 staging_item = iter_staging.next().await.transpose()?;
975 }
976 std::cmp::Ordering::Equal => {
977 break;
979 }
980 std::cmp::Ordering::Less => {
981 should_delete = true;
983 break;
984 }
985 }
986 }
987
988 if staging_item.is_none() {
990 should_delete = true;
991 }
992
993 if should_delete {
994 yield Some((vnode, main_kv.row().clone()));
995 }
996
997 processed_rows += 1;
999 tracing::debug!(
1000 "set progress table: vnode = {:?}, processed_rows = {:?}",
1001 vnode,
1002 processed_rows
1003 );
1004 progress_table.set_progress(
1005 vnode,
1006 Some(
1007 main_kv
1008 .row()
1009 .project(main_table.pk_indices())
1010 .to_owned_row(),
1011 ),
1012 false,
1013 processed_rows,
1014 )?;
1015 main_item = iter_main.next().await.transpose()?;
1016 }
1017
1018 if let Some(current_entry) = progress_table.get_progress(vnode) {
1020 progress_table.set_progress(
1021 vnode,
1022 current_entry.current_pos.clone(),
1023 true, current_entry.processed_rows,
1025 )?;
1026
1027 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1028 }
1029 }
1030
1031 yield None;
1033 }
1034
1035 fn may_update_depended_subscriptions(
1037 depended_subscriptions: &mut HashSet<SubscriberId>,
1038 barrier: &Barrier,
1039 mv_table_id: TableId,
1040 ) {
1041 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1042 if !depended_subscriptions.insert(subscriber_id) {
1043 warn!(
1044 ?depended_subscriptions,
1045 %mv_table_id,
1046 %subscriber_id,
1047 "subscription id already exists"
1048 );
1049 }
1050 }
1051
1052 if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1053 for SubscriptionUpstreamInfo {
1054 subscriber_id,
1055 upstream_mv_table_id,
1056 } in subscriptions_to_drop
1057 {
1058 if *upstream_mv_table_id == mv_table_id
1059 && !depended_subscriptions.remove(subscriber_id)
1060 {
1061 warn!(
1062 ?depended_subscriptions,
1063 %mv_table_id,
1064 %subscriber_id,
1065 "drop non existing subscriber_id id"
1066 );
1067 }
1068 }
1069 }
1070 }
1071
1072 fn init_refresh_progress(
1074 state_table: &StateTableInner<S, SD>,
1075 progress_table: &mut RefreshProgressTable<S>,
1076 _epoch: u64,
1077 ) -> StreamExecutorResult<()> {
1078 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1079
1080 for vnode in state_table.vnodes().iter_vnodes() {
1082 progress_table.set_progress(
1083 vnode, None, false, 0, )?;
1087 }
1088
1089 tracing::info!(
1090 vnodes_count = state_table.vnodes().count_ones(),
1091 "Initialized refresh progress tracking for all VNodes"
1092 );
1093
1094 Ok(())
1095 }
1096}
1097
1098impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1099 #[cfg(any(test, feature = "test"))]
1101 pub async fn for_test(
1102 input: Executor,
1103 store: S,
1104 table_id: TableId,
1105 keys: Vec<ColumnOrder>,
1106 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1107 watermark_epoch: AtomicU64Ref,
1108 conflict_behavior: ConflictBehavior,
1109 ) -> Self {
1110 Self::for_test_inner(
1111 input,
1112 store,
1113 table_id,
1114 keys,
1115 column_ids,
1116 watermark_epoch,
1117 conflict_behavior,
1118 None,
1119 )
1120 .await
1121 }
1122
1123 #[cfg(any(test, feature = "test"))]
1124 #[allow(clippy::too_many_arguments)]
1125 pub async fn for_test_with_stream_key(
1126 input: Executor,
1127 store: S,
1128 table_id: TableId,
1129 keys: Vec<ColumnOrder>,
1130 stream_key: Vec<usize>,
1131 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1132 watermark_epoch: AtomicU64Ref,
1133 conflict_behavior: ConflictBehavior,
1134 ) -> Self {
1135 Self::for_test_inner(
1136 input,
1137 store,
1138 table_id,
1139 keys,
1140 column_ids,
1141 watermark_epoch,
1142 conflict_behavior,
1143 Some(stream_key),
1144 )
1145 .await
1146 }
1147
1148 #[cfg(any(test, feature = "test"))]
1149 #[allow(clippy::too_many_arguments)]
1150 async fn for_test_inner(
1151 input: Executor,
1152 store: S,
1153 table_id: TableId,
1154 keys: Vec<ColumnOrder>,
1155 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1156 watermark_epoch: AtomicU64Ref,
1157 conflict_behavior: ConflictBehavior,
1158 stream_key: Option<Vec<usize>>,
1159 ) -> Self {
1160 use risingwave_common::util::iter_util::ZipEqFast;
1161
1162 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1163 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1164 let schema = input.schema().clone();
1165 let columns: Vec<ColumnDesc> = column_ids
1166 .into_iter()
1167 .zip_eq_fast(schema.fields.iter())
1168 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1169 .collect_vec();
1170
1171 let row_serde = BasicSerde::new(
1172 Arc::from((0..columns.len()).collect_vec()),
1173 Arc::from(columns.clone().into_boxed_slice()),
1174 );
1175 let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1176 let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1177 table_id,
1178 columns,
1179 arrange_order_types,
1180 arrange_columns.clone(),
1181 0,
1182 );
1183 table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1184 let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1185
1186 let unused = StreamingMetrics::unused();
1187 let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1188 let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1189
1190 Self {
1191 input,
1192 schema,
1193 state_table,
1194 stream_key_indices,
1195 arrange_key_indices: arrange_columns.clone(),
1196 actor_context: ActorContext::for_test(0),
1197 materialize_cache: MaterializeCache::new(
1198 watermark_epoch,
1199 MetricsInfo::for_test(),
1200 row_serde,
1201 vec![],
1202 conflict_behavior,
1203 None,
1204 cache_metrics,
1205 ),
1206 conflict_behavior,
1207 cleaned_by_ttl_watermark: false,
1208 version_column_indices: vec![],
1209 is_dummy_table: false,
1210 may_have_downstream: true,
1211 subscriber_ids: HashSet::new(),
1212 metrics,
1213 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1215 }
1216 }
1217}
1218
1219impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1220 fn execute(self: Box<Self>) -> BoxedMessageStream {
1221 self.execute_inner().boxed()
1222 }
1223}
1224
1225impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1227 f.debug_struct("MaterializeExecutor")
1228 .field("arrange_key_indices", &self.arrange_key_indices)
1229 .field("stream_key_indices", &self.stream_key_indices)
1230 .finish()
1231 }
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236
1237 use std::iter;
1238 use std::sync::atomic::AtomicU64;
1239
1240 use rand::rngs::SmallRng;
1241 use rand::{Rng, RngCore, SeedableRng};
1242 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1243 use risingwave_common::catalog::Field;
1244 use risingwave_common::util::epoch::test_epoch;
1245 use risingwave_common::util::sort_util::OrderType;
1246 use risingwave_hummock_sdk::HummockReadEpoch;
1247 use risingwave_storage::memory::MemoryStateStore;
1248 use risingwave_storage::table::batch_table::BatchTable;
1249
1250 use super::*;
1251 use crate::executor::test_utils::*;
1252
1253 #[tokio::test]
1254 async fn test_materialize_executor() {
1255 let memory_state_store = MemoryStateStore::new();
1257 let table_id = TableId::new(1);
1258 let schema = Schema::new(vec![
1260 Field::unnamed(DataType::Int32),
1261 Field::unnamed(DataType::Int32),
1262 ]);
1263 let column_ids = vec![0.into(), 1.into()];
1264
1265 let chunk1 = StreamChunk::from_pretty(
1267 " i i
1268 + 1 4
1269 + 2 5
1270 + 3 6",
1271 );
1272 let chunk2 = StreamChunk::from_pretty(
1273 " i i
1274 + 7 8
1275 - 3 6",
1276 );
1277
1278 let source = MockSource::with_messages(vec![
1280 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1281 Message::Chunk(chunk1),
1282 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1283 Message::Chunk(chunk2),
1284 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1285 ])
1286 .into_executor(schema.clone(), StreamKey::new());
1287
1288 let order_types = vec![OrderType::ascending()];
1289 let column_descs = vec![
1290 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1291 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1292 ];
1293
1294 let table = BatchTable::for_test(
1295 memory_state_store.clone(),
1296 table_id,
1297 column_descs,
1298 order_types,
1299 vec![0],
1300 vec![0, 1],
1301 );
1302
1303 let mut materialize_executor = MaterializeExecutor::for_test(
1304 source,
1305 memory_state_store,
1306 table_id,
1307 vec![ColumnOrder::new(0, OrderType::ascending())],
1308 column_ids,
1309 Arc::new(AtomicU64::new(0)),
1310 ConflictBehavior::NoCheck,
1311 )
1312 .await
1313 .boxed()
1314 .execute();
1315 materialize_executor.next().await.transpose().unwrap();
1316
1317 materialize_executor.next().await.transpose().unwrap();
1318
1319 match materialize_executor.next().await.transpose().unwrap() {
1321 Some(Message::Barrier(_)) => {
1322 let row = table
1323 .get_row(
1324 &OwnedRow::new(vec![Some(3_i32.into())]),
1325 HummockReadEpoch::NoWait(u64::MAX),
1326 )
1327 .await
1328 .unwrap();
1329 assert_eq!(
1330 row,
1331 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1332 );
1333 }
1334 _ => unreachable!(),
1335 }
1336 materialize_executor.next().await.transpose().unwrap();
1337 match materialize_executor.next().await.transpose().unwrap() {
1339 Some(Message::Barrier(_)) => {
1340 let row = table
1341 .get_row(
1342 &OwnedRow::new(vec![Some(7_i32.into())]),
1343 HummockReadEpoch::NoWait(u64::MAX),
1344 )
1345 .await
1346 .unwrap();
1347 assert_eq!(
1348 row,
1349 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1350 );
1351 }
1352 _ => unreachable!(),
1353 }
1354 }
1355
1356 #[tokio::test]
1358 async fn test_upsert_stream() {
1359 let memory_state_store = MemoryStateStore::new();
1361 let table_id = TableId::new(1);
1362 let schema = Schema::new(vec![
1364 Field::unnamed(DataType::Int32),
1365 Field::unnamed(DataType::Int32),
1366 ]);
1367 let column_ids = vec![0.into(), 1.into()];
1368
1369 let chunk1 = StreamChunk::from_pretty(
1371 " i i
1372 + 1 1",
1373 );
1374
1375 let chunk2 = StreamChunk::from_pretty(
1376 " i i
1377 + 1 2
1378 - 1 2",
1379 );
1380
1381 let source = MockSource::with_messages(vec![
1383 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1384 Message::Chunk(chunk1),
1385 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1386 Message::Chunk(chunk2),
1387 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1388 ])
1389 .into_executor(schema.clone(), StreamKey::new());
1390
1391 let order_types = vec![OrderType::ascending()];
1392 let column_descs = vec![
1393 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1394 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1395 ];
1396
1397 let table = BatchTable::for_test(
1398 memory_state_store.clone(),
1399 table_id,
1400 column_descs,
1401 order_types,
1402 vec![0],
1403 vec![0, 1],
1404 );
1405
1406 let mut materialize_executor = MaterializeExecutor::for_test(
1407 source,
1408 memory_state_store,
1409 table_id,
1410 vec![ColumnOrder::new(0, OrderType::ascending())],
1411 column_ids,
1412 Arc::new(AtomicU64::new(0)),
1413 ConflictBehavior::Overwrite,
1414 )
1415 .await
1416 .boxed()
1417 .execute();
1418 materialize_executor.next().await.transpose().unwrap();
1419
1420 materialize_executor.next().await.transpose().unwrap();
1421 materialize_executor.next().await.transpose().unwrap();
1422 materialize_executor.next().await.transpose().unwrap();
1423
1424 match materialize_executor.next().await.transpose().unwrap() {
1425 Some(Message::Barrier(_)) => {
1426 let row = table
1427 .get_row(
1428 &OwnedRow::new(vec![Some(1_i32.into())]),
1429 HummockReadEpoch::NoWait(u64::MAX),
1430 )
1431 .await
1432 .unwrap();
1433 assert!(row.is_none());
1434 }
1435 _ => unreachable!(),
1436 }
1437 }
1438
1439 #[tokio::test]
1440 async fn test_check_insert_conflict() {
1441 let memory_state_store = MemoryStateStore::new();
1443 let table_id = TableId::new(1);
1444 let schema = Schema::new(vec![
1446 Field::unnamed(DataType::Int32),
1447 Field::unnamed(DataType::Int32),
1448 ]);
1449 let column_ids = vec![0.into(), 1.into()];
1450
1451 let chunk1 = StreamChunk::from_pretty(
1453 " i i
1454 + 1 3
1455 + 1 4
1456 + 2 5
1457 + 3 6",
1458 );
1459
1460 let chunk2 = StreamChunk::from_pretty(
1461 " i i
1462 + 1 3
1463 + 2 6",
1464 );
1465
1466 let chunk3 = StreamChunk::from_pretty(
1468 " i i
1469 + 1 4",
1470 );
1471
1472 let source = MockSource::with_messages(vec![
1474 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1475 Message::Chunk(chunk1),
1476 Message::Chunk(chunk2),
1477 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1478 Message::Chunk(chunk3),
1479 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1480 ])
1481 .into_executor(schema.clone(), StreamKey::new());
1482
1483 let order_types = vec![OrderType::ascending()];
1484 let column_descs = vec![
1485 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1486 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1487 ];
1488
1489 let table = BatchTable::for_test(
1490 memory_state_store.clone(),
1491 table_id,
1492 column_descs,
1493 order_types,
1494 vec![0],
1495 vec![0, 1],
1496 );
1497
1498 let mut materialize_executor = MaterializeExecutor::for_test(
1499 source,
1500 memory_state_store,
1501 table_id,
1502 vec![ColumnOrder::new(0, OrderType::ascending())],
1503 column_ids,
1504 Arc::new(AtomicU64::new(0)),
1505 ConflictBehavior::Overwrite,
1506 )
1507 .await
1508 .boxed()
1509 .execute();
1510 materialize_executor.next().await.transpose().unwrap();
1511
1512 materialize_executor.next().await.transpose().unwrap();
1513 materialize_executor.next().await.transpose().unwrap();
1514
1515 match materialize_executor.next().await.transpose().unwrap() {
1517 Some(Message::Barrier(_)) => {
1518 let row = table
1519 .get_row(
1520 &OwnedRow::new(vec![Some(3_i32.into())]),
1521 HummockReadEpoch::NoWait(u64::MAX),
1522 )
1523 .await
1524 .unwrap();
1525 assert_eq!(
1526 row,
1527 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1528 );
1529
1530 let row = table
1531 .get_row(
1532 &OwnedRow::new(vec![Some(1_i32.into())]),
1533 HummockReadEpoch::NoWait(u64::MAX),
1534 )
1535 .await
1536 .unwrap();
1537 assert_eq!(
1538 row,
1539 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1540 );
1541
1542 let row = table
1543 .get_row(
1544 &OwnedRow::new(vec![Some(2_i32.into())]),
1545 HummockReadEpoch::NoWait(u64::MAX),
1546 )
1547 .await
1548 .unwrap();
1549 assert_eq!(
1550 row,
1551 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1552 );
1553 }
1554 _ => unreachable!(),
1555 }
1556 }
1557
1558 #[tokio::test]
1559 async fn test_delete_and_update_conflict() {
1560 let memory_state_store = MemoryStateStore::new();
1562 let table_id = TableId::new(1);
1563 let schema = Schema::new(vec![
1565 Field::unnamed(DataType::Int32),
1566 Field::unnamed(DataType::Int32),
1567 ]);
1568 let column_ids = vec![0.into(), 1.into()];
1569
1570 let chunk1 = StreamChunk::from_pretty(
1572 " i i
1573 + 1 4
1574 + 2 5
1575 + 3 6
1576 U- 8 1
1577 U+ 8 2
1578 + 8 3",
1579 );
1580
1581 let chunk2 = StreamChunk::from_pretty(
1583 " i i
1584 + 7 8
1585 - 3 4
1586 - 5 0",
1587 );
1588
1589 let chunk3 = StreamChunk::from_pretty(
1591 " i i
1592 + 1 5
1593 U- 2 4
1594 U+ 2 8
1595 U- 9 0
1596 U+ 9 1",
1597 );
1598
1599 let source = MockSource::with_messages(vec![
1601 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1602 Message::Chunk(chunk1),
1603 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1604 Message::Chunk(chunk2),
1605 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1606 Message::Chunk(chunk3),
1607 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1608 ])
1609 .into_executor(schema.clone(), StreamKey::new());
1610
1611 let order_types = vec![OrderType::ascending()];
1612 let column_descs = vec![
1613 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1614 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1615 ];
1616
1617 let table = BatchTable::for_test(
1618 memory_state_store.clone(),
1619 table_id,
1620 column_descs,
1621 order_types,
1622 vec![0],
1623 vec![0, 1],
1624 );
1625
1626 let mut materialize_executor = MaterializeExecutor::for_test(
1627 source,
1628 memory_state_store,
1629 table_id,
1630 vec![ColumnOrder::new(0, OrderType::ascending())],
1631 column_ids,
1632 Arc::new(AtomicU64::new(0)),
1633 ConflictBehavior::Overwrite,
1634 )
1635 .await
1636 .boxed()
1637 .execute();
1638 materialize_executor.next().await.transpose().unwrap();
1639
1640 materialize_executor.next().await.transpose().unwrap();
1641
1642 match materialize_executor.next().await.transpose().unwrap() {
1644 Some(Message::Barrier(_)) => {
1645 let row = table
1647 .get_row(
1648 &OwnedRow::new(vec![Some(8_i32.into())]),
1649 HummockReadEpoch::NoWait(u64::MAX),
1650 )
1651 .await
1652 .unwrap();
1653 assert_eq!(
1654 row,
1655 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1656 );
1657 }
1658 _ => unreachable!(),
1659 }
1660 materialize_executor.next().await.transpose().unwrap();
1661
1662 match materialize_executor.next().await.transpose().unwrap() {
1663 Some(Message::Barrier(_)) => {
1664 let row = table
1665 .get_row(
1666 &OwnedRow::new(vec![Some(7_i32.into())]),
1667 HummockReadEpoch::NoWait(u64::MAX),
1668 )
1669 .await
1670 .unwrap();
1671 assert_eq!(
1672 row,
1673 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1674 );
1675
1676 let row = table
1678 .get_row(
1679 &OwnedRow::new(vec![Some(3_i32.into())]),
1680 HummockReadEpoch::NoWait(u64::MAX),
1681 )
1682 .await
1683 .unwrap();
1684 assert_eq!(row, None);
1685
1686 let row = table
1688 .get_row(
1689 &OwnedRow::new(vec![Some(5_i32.into())]),
1690 HummockReadEpoch::NoWait(u64::MAX),
1691 )
1692 .await
1693 .unwrap();
1694 assert_eq!(row, None);
1695 }
1696 _ => unreachable!(),
1697 }
1698
1699 materialize_executor.next().await.transpose().unwrap();
1700 match materialize_executor.next().await.transpose().unwrap() {
1702 Some(Message::Barrier(_)) => {
1703 let row = table
1704 .get_row(
1705 &OwnedRow::new(vec![Some(1_i32.into())]),
1706 HummockReadEpoch::NoWait(u64::MAX),
1707 )
1708 .await
1709 .unwrap();
1710 assert_eq!(
1711 row,
1712 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1713 );
1714
1715 let row = table
1717 .get_row(
1718 &OwnedRow::new(vec![Some(2_i32.into())]),
1719 HummockReadEpoch::NoWait(u64::MAX),
1720 )
1721 .await
1722 .unwrap();
1723 assert_eq!(
1724 row,
1725 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1726 );
1727
1728 let row = table
1730 .get_row(
1731 &OwnedRow::new(vec![Some(9_i32.into())]),
1732 HummockReadEpoch::NoWait(u64::MAX),
1733 )
1734 .await
1735 .unwrap();
1736 assert_eq!(
1737 row,
1738 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1739 );
1740 }
1741 _ => unreachable!(),
1742 }
1743 }
1744
1745 #[tokio::test]
1746 async fn test_change_buffer_into_chunk_with_stream_key() {
1747 let memory_state_store = MemoryStateStore::new();
1750 let table_id = TableId::new(1);
1751 let schema = Schema::new(vec![
1752 Field::unnamed(DataType::Int32),
1753 Field::unnamed(DataType::Int32),
1754 ]);
1755 let column_ids = vec![0.into(), 1.into()];
1756
1757 let chunk1 = StreamChunk::from_pretty(
1758 " i i
1759 + 1 4",
1760 );
1761 let chunk2 = StreamChunk::from_pretty(
1762 " i i
1763 + 1 5",
1764 );
1765
1766 let source = MockSource::with_messages(vec![
1767 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1768 Message::Chunk(chunk1),
1769 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1770 Message::Chunk(chunk2),
1771 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1772 ])
1773 .into_executor(schema.clone(), StreamKey::new());
1774
1775 let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1776 source,
1777 memory_state_store,
1778 table_id,
1779 vec![ColumnOrder::new(0, OrderType::ascending())],
1780 vec![0, 1],
1781 column_ids,
1782 Arc::new(AtomicU64::new(0)),
1783 ConflictBehavior::Overwrite,
1784 )
1785 .await
1786 .boxed()
1787 .execute();
1788
1789 materialize_executor.next().await.transpose().unwrap();
1791 materialize_executor.next().await.transpose().unwrap();
1792
1793 materialize_executor.next().await.transpose().unwrap();
1795
1796 match materialize_executor.next().await.transpose().unwrap() {
1798 Some(Message::Chunk(chunk)) => {
1799 assert_eq!(
1800 chunk.compact_vis(),
1801 StreamChunk::from_pretty(
1802 " i i
1803 - 1 4
1804 + 1 5"
1805 )
1806 );
1807 }
1808 other => panic!("expect chunk, got {other:?}"),
1809 }
1810 }
1811
1812 #[tokio::test]
1813 async fn test_ignore_insert_conflict() {
1814 let memory_state_store = MemoryStateStore::new();
1816 let table_id = TableId::new(1);
1817 let schema = Schema::new(vec![
1819 Field::unnamed(DataType::Int32),
1820 Field::unnamed(DataType::Int32),
1821 ]);
1822 let column_ids = vec![0.into(), 1.into()];
1823
1824 let chunk1 = StreamChunk::from_pretty(
1826 " i i
1827 + 1 3
1828 + 1 4
1829 + 2 5
1830 + 3 6",
1831 );
1832
1833 let chunk2 = StreamChunk::from_pretty(
1834 " i i
1835 + 1 5
1836 + 2 6",
1837 );
1838
1839 let chunk3 = StreamChunk::from_pretty(
1841 " i i
1842 + 1 6",
1843 );
1844
1845 let source = MockSource::with_messages(vec![
1847 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1848 Message::Chunk(chunk1),
1849 Message::Chunk(chunk2),
1850 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1851 Message::Chunk(chunk3),
1852 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1853 ])
1854 .into_executor(schema.clone(), StreamKey::new());
1855
1856 let order_types = vec![OrderType::ascending()];
1857 let column_descs = vec![
1858 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1859 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1860 ];
1861
1862 let table = BatchTable::for_test(
1863 memory_state_store.clone(),
1864 table_id,
1865 column_descs,
1866 order_types,
1867 vec![0],
1868 vec![0, 1],
1869 );
1870
1871 let mut materialize_executor = MaterializeExecutor::for_test(
1872 source,
1873 memory_state_store,
1874 table_id,
1875 vec![ColumnOrder::new(0, OrderType::ascending())],
1876 column_ids,
1877 Arc::new(AtomicU64::new(0)),
1878 ConflictBehavior::IgnoreConflict,
1879 )
1880 .await
1881 .boxed()
1882 .execute();
1883 materialize_executor.next().await.transpose().unwrap();
1884
1885 materialize_executor.next().await.transpose().unwrap();
1886 materialize_executor.next().await.transpose().unwrap();
1887
1888 match materialize_executor.next().await.transpose().unwrap() {
1890 Some(Message::Barrier(_)) => {
1891 let row = table
1892 .get_row(
1893 &OwnedRow::new(vec![Some(3_i32.into())]),
1894 HummockReadEpoch::NoWait(u64::MAX),
1895 )
1896 .await
1897 .unwrap();
1898 assert_eq!(
1899 row,
1900 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1901 );
1902
1903 let row = table
1904 .get_row(
1905 &OwnedRow::new(vec![Some(1_i32.into())]),
1906 HummockReadEpoch::NoWait(u64::MAX),
1907 )
1908 .await
1909 .unwrap();
1910 assert_eq!(
1911 row,
1912 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1913 );
1914
1915 let row = table
1916 .get_row(
1917 &OwnedRow::new(vec![Some(2_i32.into())]),
1918 HummockReadEpoch::NoWait(u64::MAX),
1919 )
1920 .await
1921 .unwrap();
1922 assert_eq!(
1923 row,
1924 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1925 );
1926 }
1927 _ => unreachable!(),
1928 }
1929 }
1930
1931 #[tokio::test]
1932 async fn test_ignore_delete_then_insert() {
1933 let memory_state_store = MemoryStateStore::new();
1935 let table_id = TableId::new(1);
1936 let schema = Schema::new(vec![
1938 Field::unnamed(DataType::Int32),
1939 Field::unnamed(DataType::Int32),
1940 ]);
1941 let column_ids = vec![0.into(), 1.into()];
1942
1943 let chunk1 = StreamChunk::from_pretty(
1945 " i i
1946 + 1 3
1947 - 1 3
1948 + 1 6",
1949 );
1950
1951 let source = MockSource::with_messages(vec![
1953 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1954 Message::Chunk(chunk1),
1955 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1956 ])
1957 .into_executor(schema.clone(), StreamKey::new());
1958
1959 let order_types = vec![OrderType::ascending()];
1960 let column_descs = vec![
1961 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1962 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1963 ];
1964
1965 let table = BatchTable::for_test(
1966 memory_state_store.clone(),
1967 table_id,
1968 column_descs,
1969 order_types,
1970 vec![0],
1971 vec![0, 1],
1972 );
1973
1974 let mut materialize_executor = MaterializeExecutor::for_test(
1975 source,
1976 memory_state_store,
1977 table_id,
1978 vec![ColumnOrder::new(0, OrderType::ascending())],
1979 column_ids,
1980 Arc::new(AtomicU64::new(0)),
1981 ConflictBehavior::IgnoreConflict,
1982 )
1983 .await
1984 .boxed()
1985 .execute();
1986 let _msg1 = materialize_executor
1987 .next()
1988 .await
1989 .transpose()
1990 .unwrap()
1991 .unwrap()
1992 .as_barrier()
1993 .unwrap();
1994 let _msg2 = materialize_executor
1995 .next()
1996 .await
1997 .transpose()
1998 .unwrap()
1999 .unwrap()
2000 .as_chunk()
2001 .unwrap();
2002 let _msg3 = materialize_executor
2003 .next()
2004 .await
2005 .transpose()
2006 .unwrap()
2007 .unwrap()
2008 .as_barrier()
2009 .unwrap();
2010
2011 let row = table
2012 .get_row(
2013 &OwnedRow::new(vec![Some(1_i32.into())]),
2014 HummockReadEpoch::NoWait(u64::MAX),
2015 )
2016 .await
2017 .unwrap();
2018 assert_eq!(
2019 row,
2020 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2021 );
2022 }
2023
2024 #[tokio::test]
2025 async fn test_ignore_delete_and_update_conflict() {
2026 let memory_state_store = MemoryStateStore::new();
2028 let table_id = TableId::new(1);
2029 let schema = Schema::new(vec![
2031 Field::unnamed(DataType::Int32),
2032 Field::unnamed(DataType::Int32),
2033 ]);
2034 let column_ids = vec![0.into(), 1.into()];
2035
2036 let chunk1 = StreamChunk::from_pretty(
2038 " i i
2039 + 1 4
2040 + 2 5
2041 + 3 6
2042 U- 8 1
2043 U+ 8 2
2044 + 8 3",
2045 );
2046
2047 let chunk2 = StreamChunk::from_pretty(
2049 " i i
2050 + 7 8
2051 - 3 4
2052 - 5 0",
2053 );
2054
2055 let chunk3 = StreamChunk::from_pretty(
2057 " i i
2058 + 1 5
2059 U- 2 4
2060 U+ 2 8
2061 U- 9 0
2062 U+ 9 1",
2063 );
2064
2065 let source = MockSource::with_messages(vec![
2067 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2068 Message::Chunk(chunk1),
2069 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2070 Message::Chunk(chunk2),
2071 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2072 Message::Chunk(chunk3),
2073 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2074 ])
2075 .into_executor(schema.clone(), StreamKey::new());
2076
2077 let order_types = vec![OrderType::ascending()];
2078 let column_descs = vec![
2079 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2080 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2081 ];
2082
2083 let table = BatchTable::for_test(
2084 memory_state_store.clone(),
2085 table_id,
2086 column_descs,
2087 order_types,
2088 vec![0],
2089 vec![0, 1],
2090 );
2091
2092 let mut materialize_executor = MaterializeExecutor::for_test(
2093 source,
2094 memory_state_store,
2095 table_id,
2096 vec![ColumnOrder::new(0, OrderType::ascending())],
2097 column_ids,
2098 Arc::new(AtomicU64::new(0)),
2099 ConflictBehavior::IgnoreConflict,
2100 )
2101 .await
2102 .boxed()
2103 .execute();
2104 materialize_executor.next().await.transpose().unwrap();
2105
2106 materialize_executor.next().await.transpose().unwrap();
2107
2108 match materialize_executor.next().await.transpose().unwrap() {
2110 Some(Message::Barrier(_)) => {
2111 let row = table
2113 .get_row(
2114 &OwnedRow::new(vec![Some(8_i32.into())]),
2115 HummockReadEpoch::NoWait(u64::MAX),
2116 )
2117 .await
2118 .unwrap();
2119 assert_eq!(
2120 row,
2121 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2122 );
2123 }
2124 _ => unreachable!(),
2125 }
2126 materialize_executor.next().await.transpose().unwrap();
2127
2128 match materialize_executor.next().await.transpose().unwrap() {
2129 Some(Message::Barrier(_)) => {
2130 let row = table
2131 .get_row(
2132 &OwnedRow::new(vec![Some(7_i32.into())]),
2133 HummockReadEpoch::NoWait(u64::MAX),
2134 )
2135 .await
2136 .unwrap();
2137 assert_eq!(
2138 row,
2139 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2140 );
2141
2142 let row = table
2144 .get_row(
2145 &OwnedRow::new(vec![Some(3_i32.into())]),
2146 HummockReadEpoch::NoWait(u64::MAX),
2147 )
2148 .await
2149 .unwrap();
2150 assert_eq!(row, None);
2151
2152 let row = table
2154 .get_row(
2155 &OwnedRow::new(vec![Some(5_i32.into())]),
2156 HummockReadEpoch::NoWait(u64::MAX),
2157 )
2158 .await
2159 .unwrap();
2160 assert_eq!(row, None);
2161 }
2162 _ => unreachable!(),
2163 }
2164
2165 materialize_executor.next().await.transpose().unwrap();
2166 match materialize_executor.next().await.transpose().unwrap() {
2169 Some(Message::Barrier(_)) => {
2170 let row = table
2171 .get_row(
2172 &OwnedRow::new(vec![Some(1_i32.into())]),
2173 HummockReadEpoch::NoWait(u64::MAX),
2174 )
2175 .await
2176 .unwrap();
2177 assert_eq!(
2178 row,
2179 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2180 );
2181
2182 let row = table
2184 .get_row(
2185 &OwnedRow::new(vec![Some(2_i32.into())]),
2186 HummockReadEpoch::NoWait(u64::MAX),
2187 )
2188 .await
2189 .unwrap();
2190 assert_eq!(
2191 row,
2192 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2193 );
2194
2195 let row = table
2197 .get_row(
2198 &OwnedRow::new(vec![Some(9_i32.into())]),
2199 HummockReadEpoch::NoWait(u64::MAX),
2200 )
2201 .await
2202 .unwrap();
2203 assert_eq!(
2204 row,
2205 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2206 );
2207 }
2208 _ => unreachable!(),
2209 }
2210 }
2211
2212 #[tokio::test]
2213 async fn test_do_update_if_not_null_conflict() {
2214 let memory_state_store = MemoryStateStore::new();
2216 let table_id = TableId::new(1);
2217 let schema = Schema::new(vec![
2219 Field::unnamed(DataType::Int32),
2220 Field::unnamed(DataType::Int32),
2221 ]);
2222 let column_ids = vec![0.into(), 1.into()];
2223
2224 let chunk1 = StreamChunk::from_pretty(
2226 " i i
2227 + 1 4
2228 + 2 .
2229 + 3 6
2230 U- 8 .
2231 U+ 8 2
2232 + 8 .",
2233 );
2234
2235 let chunk2 = StreamChunk::from_pretty(
2237 " i i
2238 + 7 8
2239 - 3 4
2240 - 5 0",
2241 );
2242
2243 let chunk3 = StreamChunk::from_pretty(
2245 " i i
2246 + 1 5
2247 + 7 .
2248 U- 2 4
2249 U+ 2 .
2250 U- 9 0
2251 U+ 9 1",
2252 );
2253
2254 let source = MockSource::with_messages(vec![
2256 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2257 Message::Chunk(chunk1),
2258 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2259 Message::Chunk(chunk2),
2260 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2261 Message::Chunk(chunk3),
2262 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2263 ])
2264 .into_executor(schema.clone(), StreamKey::new());
2265
2266 let order_types = vec![OrderType::ascending()];
2267 let column_descs = vec![
2268 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2269 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2270 ];
2271
2272 let table = BatchTable::for_test(
2273 memory_state_store.clone(),
2274 table_id,
2275 column_descs,
2276 order_types,
2277 vec![0],
2278 vec![0, 1],
2279 );
2280
2281 let mut materialize_executor = MaterializeExecutor::for_test(
2282 source,
2283 memory_state_store,
2284 table_id,
2285 vec![ColumnOrder::new(0, OrderType::ascending())],
2286 column_ids,
2287 Arc::new(AtomicU64::new(0)),
2288 ConflictBehavior::DoUpdateIfNotNull,
2289 )
2290 .await
2291 .boxed()
2292 .execute();
2293 materialize_executor.next().await.transpose().unwrap();
2294
2295 materialize_executor.next().await.transpose().unwrap();
2296
2297 match materialize_executor.next().await.transpose().unwrap() {
2299 Some(Message::Barrier(_)) => {
2300 let row = table
2301 .get_row(
2302 &OwnedRow::new(vec![Some(8_i32.into())]),
2303 HummockReadEpoch::NoWait(u64::MAX),
2304 )
2305 .await
2306 .unwrap();
2307 assert_eq!(
2308 row,
2309 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2310 );
2311
2312 let row = table
2313 .get_row(
2314 &OwnedRow::new(vec![Some(2_i32.into())]),
2315 HummockReadEpoch::NoWait(u64::MAX),
2316 )
2317 .await
2318 .unwrap();
2319 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2320 }
2321 _ => unreachable!(),
2322 }
2323 materialize_executor.next().await.transpose().unwrap();
2324
2325 match materialize_executor.next().await.transpose().unwrap() {
2326 Some(Message::Barrier(_)) => {
2327 let row = table
2328 .get_row(
2329 &OwnedRow::new(vec![Some(7_i32.into())]),
2330 HummockReadEpoch::NoWait(u64::MAX),
2331 )
2332 .await
2333 .unwrap();
2334 assert_eq!(
2335 row,
2336 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2337 );
2338
2339 let row = table
2341 .get_row(
2342 &OwnedRow::new(vec![Some(3_i32.into())]),
2343 HummockReadEpoch::NoWait(u64::MAX),
2344 )
2345 .await
2346 .unwrap();
2347 assert_eq!(row, None);
2348
2349 let row = table
2351 .get_row(
2352 &OwnedRow::new(vec![Some(5_i32.into())]),
2353 HummockReadEpoch::NoWait(u64::MAX),
2354 )
2355 .await
2356 .unwrap();
2357 assert_eq!(row, None);
2358 }
2359 _ => unreachable!(),
2360 }
2361
2362 materialize_executor.next().await.transpose().unwrap();
2363 match materialize_executor.next().await.transpose().unwrap() {
2366 Some(Message::Barrier(_)) => {
2367 let row = table
2368 .get_row(
2369 &OwnedRow::new(vec![Some(7_i32.into())]),
2370 HummockReadEpoch::NoWait(u64::MAX),
2371 )
2372 .await
2373 .unwrap();
2374 assert_eq!(
2375 row,
2376 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2377 );
2378
2379 let row = table
2381 .get_row(
2382 &OwnedRow::new(vec![Some(2_i32.into())]),
2383 HummockReadEpoch::NoWait(u64::MAX),
2384 )
2385 .await
2386 .unwrap();
2387 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2388
2389 let row = table
2391 .get_row(
2392 &OwnedRow::new(vec![Some(9_i32.into())]),
2393 HummockReadEpoch::NoWait(u64::MAX),
2394 )
2395 .await
2396 .unwrap();
2397 assert_eq!(
2398 row,
2399 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2400 );
2401 }
2402 _ => unreachable!(),
2403 }
2404 }
2405
2406 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2407 const KN: u32 = 4;
2408 const SEED: u64 = 998244353;
2409 let mut ret = vec![];
2410 let mut builder =
2411 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2412 let mut rng = SmallRng::seed_from_u64(SEED);
2413
2414 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2415 let len = c.data_chunk().capacity();
2416 let mut c = StreamChunkMut::from(c);
2417 for i in 0..len {
2418 c.set_vis(i, rng.random_bool(0.5));
2419 }
2420 c.into()
2421 };
2422 for _ in 0..row_number {
2423 let k = (rng.next_u32() % KN) as i32;
2424 let v = rng.next_u32() as i32;
2425 let op = if rng.random_bool(0.5) {
2426 Op::Insert
2427 } else {
2428 Op::Delete
2429 };
2430 if let Some(c) =
2431 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2432 {
2433 ret.push(random_vis(c, &mut rng));
2434 }
2435 }
2436 if let Some(c) = builder.take() {
2437 ret.push(random_vis(c, &mut rng));
2438 }
2439 ret
2440 }
2441
2442 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2443 const N: usize = 100000;
2444
2445 let memory_state_store = MemoryStateStore::new();
2447 let table_id = TableId::new(1);
2448 let schema = Schema::new(vec![
2450 Field::unnamed(DataType::Int32),
2451 Field::unnamed(DataType::Int32),
2452 ]);
2453 let column_ids = vec![0.into(), 1.into()];
2454
2455 let chunks = gen_fuzz_data(N, 128);
2456 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2457 .chain(chunks.into_iter().map(Message::Chunk))
2458 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2459 test_epoch(2),
2460 ))))
2461 .collect();
2462 let source =
2464 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2465
2466 let mut materialize_executor = MaterializeExecutor::for_test(
2467 source,
2468 memory_state_store.clone(),
2469 table_id,
2470 vec![ColumnOrder::new(0, OrderType::ascending())],
2471 column_ids,
2472 Arc::new(AtomicU64::new(0)),
2473 conflict_behavior,
2474 )
2475 .await
2476 .boxed()
2477 .execute();
2478 materialize_executor.expect_barrier().await;
2479
2480 let order_types = vec![OrderType::ascending()];
2481 let column_descs = vec![
2482 ColumnDesc::unnamed(0.into(), DataType::Int32),
2483 ColumnDesc::unnamed(1.into(), DataType::Int32),
2484 ];
2485 let pk_indices = vec![0];
2486
2487 let mut table = StateTable::from_table_catalog(
2488 &crate::common::table::test_utils::gen_pbtable(
2489 TableId::from(1002),
2490 column_descs.clone(),
2491 order_types,
2492 pk_indices,
2493 0,
2494 ),
2495 memory_state_store.clone(),
2496 None,
2497 )
2498 .await;
2499
2500 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2501 table.write_chunk(c);
2503 }
2504 }
2505
2506 #[tokio::test]
2507 async fn fuzz_test_stream_consistent_upsert() {
2508 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2509 }
2510
2511 #[tokio::test]
2512 async fn fuzz_test_stream_consistent_ignore() {
2513 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2514 }
2515}