1use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57 NormalIngestion,
58 MergingData,
59 CleanUp,
60 CommitAndYieldBarrier {
61 barrier: BarrierInner<M>,
62 expect_next_state: Box<MaterializeStreamState<M>>,
63 },
64 RefreshEnd {
65 on_complete_epoch: EpochPair,
66 },
67}
68
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71 input: Executor,
72
73 schema: Schema,
74
75 state_table: StateTableInner<S, SD>,
76
77 stream_key_indices: Vec<usize>,
88
89 arrange_key_indices: Vec<usize>,
91
92 actor_context: ActorContextRef,
93
94 materialize_cache: Option<MaterializeCache>,
96
97 toastable_column_indices: Option<Vec<usize>>,
102
103 conflict_behavior: ConflictBehavior,
104
105 cleaned_by_ttl_watermark: bool,
107
108 version_column_indices: Vec<u32>,
109
110 may_have_downstream: bool,
111
112 subscriber_ids: HashSet<SubscriberId>,
113
114 metrics: MaterializeMetrics,
115
116 is_dummy_table: bool,
119
120 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
122
123 local_barrier_manager: LocalBarrierManager,
125}
126
127pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
129 pub table_catalog: Table,
131
132 pub staging_table_catalog: Table,
134
135 pub is_refreshing: bool,
137
138 pub staging_table: StateTableInner<S, SD>,
145
146 pub progress_table: RefreshProgressTable<S>,
148
149 pub table_id: TableId,
151}
152
153impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
154 pub async fn new(
156 store: S,
157 table_catalog: &Table,
158 staging_table_catalog: &Table,
159 progress_state_table: &Table,
160 vnodes: Option<Arc<Bitmap>>,
161 ) -> Self {
162 let table_id = table_catalog.id;
163
164 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
166 staging_table_catalog,
167 store.clone(),
168 vnodes.clone(),
169 )
170 .await;
171
172 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
173 progress_state_table,
174 store,
175 vnodes,
176 )
177 .await;
178
179 let pk_len = table_catalog.pk.len();
181 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
182
183 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
184
185 Self {
186 table_catalog: table_catalog.clone(),
187 staging_table_catalog: staging_table_catalog.clone(),
188 is_refreshing: false,
189 staging_table,
190 progress_table,
191 table_id,
192 }
193 }
194}
195
196fn get_op_consistency_level(
197 cleaned_by_ttl_watermark: bool,
198 conflict_behavior: ConflictBehavior,
199 may_have_downstream: bool,
200 subscriber_ids: &HashSet<SubscriberId>,
201) -> StateTableOpConsistencyLevel {
202 if cleaned_by_ttl_watermark {
203 StateTableOpConsistencyLevel::Inconsistent
207 } else if !subscriber_ids.is_empty() {
208 StateTableOpConsistencyLevel::LogStoreEnabled
209 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
210 StateTableOpConsistencyLevel::Inconsistent
213 } else {
214 StateTableOpConsistencyLevel::ConsistentOldValue
215 }
216}
217
218impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
219 #[expect(clippy::too_many_arguments)]
223 pub async fn new(
224 input: Executor,
225 schema: Schema,
226 store: S,
227 arrange_key: Vec<ColumnOrder>,
228 actor_context: ActorContextRef,
229 vnodes: Option<Arc<Bitmap>>,
230 table_catalog: &Table,
231 watermark_epoch: AtomicU64Ref,
232 conflict_behavior: ConflictBehavior,
233 version_column_indices: Vec<u32>,
234 metrics: Arc<StreamingMetrics>,
235 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
236 cleaned_by_ttl_watermark: bool,
237 local_barrier_manager: LocalBarrierManager,
238 ) -> Self {
239 let table_columns: Vec<ColumnDesc> = table_catalog
240 .columns
241 .iter()
242 .map(|col| col.column_desc.as_ref().unwrap().into())
243 .collect();
244
245 let toastable_column_indices = if table_catalog.cdc_table_type()
248 == risingwave_pb::catalog::table::CdcTableType::Postgres
249 {
250 let toastable_indices: Vec<usize> = table_columns
251 .iter()
252 .enumerate()
253 .filter_map(|(index, column)| match &column.data_type {
254 DataType::Varchar
264 | DataType::List(_)
265 | DataType::Bytea
266 | DataType::Jsonb
267 | DataType::Vector(_) => Some(index),
268 _ => None,
269 })
270 .collect();
271
272 if toastable_indices.is_empty() {
273 None
274 } else {
275 Some(toastable_indices)
276 }
277 } else {
278 None
279 };
280
281 let row_serde: BasicSerde = BasicSerde::new(
282 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
283 Arc::from(table_columns.into_boxed_slice()),
284 );
285
286 let stream_key_indices: Vec<usize> = table_catalog
287 .stream_key
288 .iter()
289 .map(|idx| *idx as usize)
290 .collect();
291 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
292 let may_have_downstream = actor_context.initial_dispatch_num != 0;
293 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
294 let op_consistency_level = get_op_consistency_level(
295 cleaned_by_ttl_watermark,
296 conflict_behavior,
297 may_have_downstream,
298 &subscriber_ids,
299 );
300 let state_table_metrics = metrics.new_state_table_metrics(
301 table_catalog.id,
302 actor_context.id,
303 actor_context.fragment_id,
304 );
305 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
307 .with_op_consistency_level(op_consistency_level)
308 .enable_preload_all_rows_by_config(&actor_context.config)
309 .enable_vnode_key_stats(
310 actor_context
311 .config
312 .developer
313 .enable_vnode_key_stats_for_materialize,
314 &actor_context.config,
315 )
316 .with_metrics(state_table_metrics)
317 .build()
318 .await;
319
320 let mv_metrics = metrics.new_materialize_metrics(
321 table_catalog.id,
322 actor_context.id,
323 actor_context.fragment_id,
324 );
325 let cache_metrics = metrics.new_materialize_cache_metrics(
326 table_catalog.id,
327 actor_context.id,
328 actor_context.fragment_id,
329 );
330
331 let metrics_info =
332 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
333
334 let is_dummy_table =
335 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
336
337 Self {
338 input,
339 schema,
340 state_table,
341 stream_key_indices,
342 arrange_key_indices,
343 actor_context,
344 materialize_cache: MaterializeCache::new(
345 watermark_epoch,
346 metrics_info,
347 row_serde,
348 version_column_indices.clone(),
349 conflict_behavior,
350 toastable_column_indices.clone(),
351 cache_metrics,
352 ),
353 toastable_column_indices,
354 conflict_behavior,
355 cleaned_by_ttl_watermark,
356 version_column_indices,
357 is_dummy_table,
358 may_have_downstream,
359 subscriber_ids,
360 metrics: mv_metrics,
361 refresh_args,
362 local_barrier_manager,
363 }
364 }
365
366 #[try_stream(ok = Message, error = StreamExecutorError)]
367 async fn execute_inner(mut self) {
368 let mv_table_id = self.state_table.table_id();
369 let data_types = self.schema.data_types();
370 let mut input = self.input.execute();
371
372 let barrier = expect_first_barrier(&mut input).await?;
373 let first_epoch = barrier.epoch;
374 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
377 self.state_table.init_epoch(first_epoch).await?;
378
379 let mut inner_state =
381 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
382 if let Some(ref mut refresh_args) = self.refresh_args {
384 refresh_args.staging_table.init_epoch(first_epoch).await?;
385
386 refresh_args.progress_table.recover(first_epoch).await?;
388
389 let progress_stats = refresh_args.progress_table.get_progress_stats();
391 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
392 refresh_args.is_refreshing = true;
393 tracing::info!(
394 total_vnodes = progress_stats.total_vnodes,
395 completed_vnodes = progress_stats.completed_vnodes,
396 "Recovered refresh in progress, resuming refresh operation"
397 );
398
399 let incomplete_vnodes: Vec<_> = refresh_args
403 .progress_table
404 .get_all_progress()
405 .iter()
406 .filter(|(_, entry)| !entry.is_completed)
407 .map(|(&vnode, _)| vnode)
408 .collect();
409
410 if !incomplete_vnodes.is_empty() {
411 tracing::info!(
413 incomplete_vnodes = incomplete_vnodes.len(),
414 "Recovery detected incomplete VNodes, resuming refresh operation"
415 );
416 } else {
419 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
421 }
422 }
423 }
424
425 if let Some(ref refresh_args) = self.refresh_args
427 && refresh_args.is_refreshing
428 {
429 let incomplete_vnodes: Vec<_> = refresh_args
431 .progress_table
432 .get_all_progress()
433 .iter()
434 .filter(|(_, entry)| !entry.is_completed)
435 .map(|(&vnode, _)| vnode)
436 .collect();
437 if !incomplete_vnodes.is_empty() {
438 *inner_state = MaterializeStreamState::<_>::MergingData;
440 tracing::info!(
441 incomplete_vnodes = incomplete_vnodes.len(),
442 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
443 );
444 }
445 }
446
447 'main_loop: loop {
449 match *inner_state {
450 MaterializeStreamState::NormalIngestion => {
451 #[for_await]
452 '_normal_ingest: for msg in input.by_ref() {
453 let msg = msg?;
454 if let Some(cache) = &mut self.materialize_cache {
455 cache.evict();
456 }
457
458 match msg {
459 Message::Watermark(w) => {
460 if self.cleaned_by_ttl_watermark
461 && self.state_table.clean_watermark_index == Some(w.col_idx)
462 {
463 self.state_table.update_watermark(w.val.clone());
464 }
465 yield Message::Watermark(w);
466 }
467 Message::Chunk(chunk) if self.is_dummy_table => {
468 self.metrics
469 .materialize_input_row_count
470 .inc_by(chunk.cardinality() as u64);
471 yield Message::Chunk(chunk);
472 }
473 Message::Chunk(chunk) => {
474 self.metrics
475 .materialize_input_row_count
476 .inc_by(chunk.cardinality() as u64);
477
478 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
491 self.conflict_behavior
492 && !self.state_table.is_consistent_op()
493 && !self.cleaned_by_ttl_watermark
494 && self.version_column_indices.is_empty()
495 && self.toastable_column_indices.is_none()
496 {
497 ConflictBehavior::NoCheck
498 } else {
499 self.conflict_behavior
500 };
501
502 match optimized_conflict_behavior {
503 checked_conflict_behaviors!() => {
504 if chunk.cardinality() == 0 {
505 continue;
507 }
508
509 if let Some(ref mut refresh_args) = self.refresh_args
512 && refresh_args.is_refreshing
513 {
514 let key_chunk = chunk
515 .clone()
516 .project(self.state_table.pk_indices());
517 tracing::trace!(
518 staging_chunk = %key_chunk.to_pretty(),
519 input_chunk = %chunk.to_pretty(),
520 "writing to staging table"
521 );
522 if cfg!(debug_assertions) {
523 assert!(
525 key_chunk
526 .ops()
527 .iter()
528 .all(|op| op == &Op::Insert)
529 );
530 }
531 refresh_args
532 .staging_table
533 .write_chunk(key_chunk.clone());
534 refresh_args.staging_table.try_flush().await?;
535 }
536
537 let cache = self.materialize_cache.as_mut().unwrap();
538 let change_buffer =
539 cache.handle_new(chunk, &self.state_table).await?;
540
541 let output_chunk = if self.stream_key_indices
542 == self.state_table.pk_indices()
543 {
544 change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
545 data_types.clone(),
546 )
547 } else {
548 debug_assert!(
551 self.state_table
552 .pk_indices()
553 .iter()
554 .all(|&i| self.stream_key_indices.contains(&i))
555 );
556 change_buffer.into_chunk_with_key(
557 data_types.clone(),
558 &self.stream_key_indices,
559 )
560 };
561
562 match output_chunk {
563 Some(output_chunk) => {
564 self.state_table.write_chunk(output_chunk.clone());
565 self.state_table.try_flush().await?;
566 yield Message::Chunk(output_chunk);
567 }
568 None => continue,
569 }
570 }
571 ConflictBehavior::NoCheck => {
572 self.state_table.write_chunk(chunk.clone());
573 self.state_table.try_flush().await?;
574
575 if let Some(ref mut refresh_args) = self.refresh_args
577 && refresh_args.is_refreshing
578 {
579 let key_chunk = chunk
580 .clone()
581 .project(self.state_table.pk_indices());
582 tracing::trace!(
583 staging_chunk = %key_chunk.to_pretty(),
584 input_chunk = %chunk.to_pretty(),
585 "writing to staging table"
586 );
587 if cfg!(debug_assertions) {
588 assert!(
590 key_chunk
591 .ops()
592 .iter()
593 .all(|op| op == &Op::Insert)
594 );
595 }
596 refresh_args
597 .staging_table
598 .write_chunk(key_chunk.clone());
599 refresh_args.staging_table.try_flush().await?;
600 }
601
602 yield Message::Chunk(chunk);
603 }
604 }
605 }
606 Message::Barrier(barrier) => {
607 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
608 barrier,
609 expect_next_state: Box::new(
610 MaterializeStreamState::NormalIngestion,
611 ),
612 };
613 continue 'main_loop;
614 }
615 }
616 }
617
618 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
619 anyhow::anyhow!(
620 "Input stream terminated unexpectedly during normal ingestion"
621 ),
622 )));
623 }
624 MaterializeStreamState::MergingData => {
625 let Some(refresh_args) = self.refresh_args.as_mut() else {
626 panic!(
627 "MaterializeExecutor entered CleanUp state without refresh_args configured"
628 );
629 };
630 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
631
632 debug_assert_eq!(
633 self.state_table.vnodes(),
634 refresh_args.staging_table.vnodes()
635 );
636 debug_assert_eq!(
637 refresh_args.staging_table.vnodes(),
638 refresh_args.progress_table.vnodes()
639 );
640
641 let mut rows_to_delete = vec![];
642 let mut merge_complete = false;
643 let mut pending_barrier: Option<Barrier> = None;
644
645 {
647 let left_input = input.by_ref().map(Either::Left);
648 let right_merge_sort = pin!(
649 Self::make_mergesort_stream(
650 &self.state_table,
651 &refresh_args.staging_table,
652 &mut refresh_args.progress_table
653 )
654 .map(Either::Right)
655 );
656
657 let mut merge_stream =
660 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
661 stream::PollNext::Left
662 });
663
664 #[for_await]
665 'merge_stream: for either in &mut merge_stream {
666 match either {
667 Either::Left(msg) => {
668 let msg = msg?;
669 match msg {
670 Message::Watermark(w) => yield Message::Watermark(w),
671 Message::Chunk(chunk) => {
672 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
673 }
674 Message::Barrier(b) => {
675 pending_barrier = Some(b);
676 break 'merge_stream;
677 }
678 }
679 }
680 Either::Right(result) => {
681 match result? {
682 Some((_vnode, row)) => {
683 rows_to_delete.push(row);
684 }
685 None => {
686 merge_complete = true;
688
689 }
691 }
692 }
693 }
694 }
695 }
696
697 for row in &rows_to_delete {
699 self.state_table.delete(row);
700 }
701 if !rows_to_delete.is_empty() {
702 let to_delete_chunk = StreamChunk::from_rows(
703 &rows_to_delete
704 .iter()
705 .map(|row| (Op::Delete, row))
706 .collect_vec(),
707 &self.schema.data_types(),
708 );
709
710 yield Message::Chunk(to_delete_chunk);
711 }
712
713 assert!(pending_barrier.is_some(), "pending barrier is not set");
715
716 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
717 barrier: pending_barrier.unwrap(),
718 expect_next_state: if merge_complete {
719 Box::new(MaterializeStreamState::CleanUp)
720 } else {
721 Box::new(MaterializeStreamState::MergingData)
722 },
723 };
724 continue 'main_loop;
725 }
726 MaterializeStreamState::CleanUp => {
727 let Some(refresh_args) = self.refresh_args.as_mut() else {
728 panic!(
729 "MaterializeExecutor entered MergingData state without refresh_args configured"
730 );
731 };
732 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
733
734 #[for_await]
735 for msg in input.by_ref() {
736 let msg = msg?;
737 match msg {
738 Message::Watermark(w) => {
739 if self.cleaned_by_ttl_watermark
740 && self.state_table.clean_watermark_index == Some(w.col_idx)
741 {
742 self.state_table.update_watermark(w.val.clone());
743 }
744 yield Message::Watermark(w)
745 }
746 Message::Chunk(chunk) => {
747 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
748 }
749 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
750 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
751 barrier,
752 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
753 };
754 continue 'main_loop;
755 }
756 Message::Barrier(barrier) => {
757 let staging_table_id = refresh_args.staging_table.table_id();
758 let epoch = barrier.epoch;
759 self.local_barrier_manager.report_refresh_finished(
760 epoch,
761 self.actor_context.id,
762 refresh_args.table_id,
763 staging_table_id,
764 );
765 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
766
767 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
768 barrier,
769 expect_next_state: Box::new(
770 MaterializeStreamState::RefreshEnd {
771 on_complete_epoch: epoch,
772 },
773 ),
774 };
775 continue 'main_loop;
776 }
777 }
778 }
779 }
780 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
781 let Some(refresh_args) = self.refresh_args.as_mut() else {
782 panic!(
783 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
784 );
785 };
786 let staging_table_id = refresh_args.staging_table.table_id();
787
788 let staging_store = refresh_args.staging_table.state_store().clone();
790 staging_store
791 .try_wait_epoch(
792 HummockReadEpoch::Committed(on_complete_epoch.prev),
793 TryWaitEpochOptions {
794 table_id: staging_table_id,
795 },
796 )
797 .await?;
798
799 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
800
801 if let Some(ref mut refresh_args) = self.refresh_args {
802 refresh_args.is_refreshing = false;
803 }
804 *inner_state = MaterializeStreamState::NormalIngestion;
805 continue 'main_loop;
806 }
807 MaterializeStreamState::CommitAndYieldBarrier {
808 barrier,
809 mut expect_next_state,
810 } => {
811 if let Some(ref mut refresh_args) = self.refresh_args {
812 match barrier.mutation.as_deref() {
813 Some(Mutation::RefreshStart {
814 table_id: refresh_table_id,
815 associated_source_id: _,
816 }) if *refresh_table_id == refresh_args.table_id => {
817 debug_assert!(
818 !refresh_args.is_refreshing,
819 "cannot start refresh twice"
820 );
821 refresh_args.is_refreshing = true;
822 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
823
824 Self::init_refresh_progress(
826 &self.state_table,
827 &mut refresh_args.progress_table,
828 barrier.epoch.curr,
829 )?;
830 }
831 Some(Mutation::LoadFinish {
832 associated_source_id: load_finish_source_id,
833 }) => {
834 let associated_source_id: SourceId = match refresh_args
836 .table_catalog
837 .optional_associated_source_id
838 {
839 Some(id) => id.into(),
840 None => unreachable!("associated_source_id is not set"),
841 };
842
843 if *load_finish_source_id == associated_source_id {
844 tracing::info!(
845 %load_finish_source_id,
846 "LoadFinish received, starting data replacement"
847 );
848 *expect_next_state = MaterializeStreamState::<_>::MergingData;
849 }
850 }
851 _ => {}
852 }
853 }
854
855 if !self.may_have_downstream
859 && barrier.has_more_downstream_fragments(self.actor_context.id)
860 {
861 self.may_have_downstream = true;
862 }
863 Self::may_update_depended_subscriptions(
864 &mut self.subscriber_ids,
865 &barrier,
866 mv_table_id,
867 );
868 let op_consistency_level = get_op_consistency_level(
869 self.cleaned_by_ttl_watermark,
870 self.conflict_behavior,
871 self.may_have_downstream,
872 &self.subscriber_ids,
873 );
874 let post_commit = self
875 .state_table
876 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
877 .await?;
878
879 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
880
881 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
883 {
884 Some((
887 refresh_args.staging_table.commit(barrier.epoch).await?,
888 refresh_args.progress_table.commit(barrier.epoch).await?,
889 ))
890 } else {
891 None
892 };
893
894 let b_epoch = barrier.epoch;
895 yield Message::Barrier(barrier);
896
897 if let Some((_, cache_may_stale)) = post_commit
899 .post_yield_barrier(update_vnode_bitmap.clone())
900 .await?
901 && cache_may_stale
902 && let Some(cache) = &mut self.materialize_cache
903 {
904 cache.clear();
905 }
906
907 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
909 staging_post_commit
910 .post_yield_barrier(update_vnode_bitmap.clone())
911 .await?;
912 progress_post_commit
913 .post_yield_barrier(update_vnode_bitmap)
914 .await?;
915 }
916
917 self.metrics
918 .materialize_current_epoch
919 .set(b_epoch.curr as i64);
920
921 *inner_state = *expect_next_state;
924 }
925 }
926 }
927 }
928
929 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
933 async fn make_mergesort_stream<'a>(
934 main_table: &'a StateTableInner<S, SD>,
935 staging_table: &'a StateTableInner<S, SD>,
936 progress_table: &'a mut RefreshProgressTable<S>,
937 ) {
938 for vnode in main_table.vnodes().clone().iter_vnodes() {
939 let mut processed_rows = 0;
940 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
942 if let Some(current_entry) = progress_table.get_progress(vnode) {
943 if current_entry.is_completed {
945 tracing::debug!(
946 vnode = vnode.to_index(),
947 "Skipping already completed VNode during recovery"
948 );
949 continue;
950 }
951 processed_rows += current_entry.processed_rows;
952 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
953
954 if let Some(current_state) = ¤t_entry.current_pos {
955 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
956 } else {
957 (Bound::Unbounded, Bound::Unbounded)
958 }
959 } else {
960 (Bound::Unbounded, Bound::Unbounded)
961 };
962
963 let iter_main = main_table
964 .iter_keyed_row_with_vnode(
965 vnode,
966 &pk_range,
967 PrefetchOptions::prefetch_for_large_range_scan(),
968 )
969 .await?;
970 let iter_staging = staging_table
971 .iter_keyed_row_with_vnode(
972 vnode,
973 &pk_range,
974 PrefetchOptions::prefetch_for_large_range_scan(),
975 )
976 .await?;
977
978 pin_mut!(iter_main);
979 pin_mut!(iter_staging);
980
981 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
983 let mut staging_item: Option<KeyedRow<Bytes>> =
984 iter_staging.next().await.transpose()?;
985
986 while let Some(main_kv) = main_item {
987 let main_key = main_kv.key();
988
989 let mut should_delete = false;
991 while let Some(staging_kv) = &staging_item {
992 let staging_key = staging_kv.key();
993 match main_key.cmp(staging_key) {
994 std::cmp::Ordering::Greater => {
995 staging_item = iter_staging.next().await.transpose()?;
997 }
998 std::cmp::Ordering::Equal => {
999 break;
1001 }
1002 std::cmp::Ordering::Less => {
1003 should_delete = true;
1005 break;
1006 }
1007 }
1008 }
1009
1010 if staging_item.is_none() {
1012 should_delete = true;
1013 }
1014
1015 if should_delete {
1016 yield Some((vnode, main_kv.row().clone()));
1017 }
1018
1019 processed_rows += 1;
1021 tracing::debug!(
1022 "set progress table: vnode = {:?}, processed_rows = {:?}",
1023 vnode,
1024 processed_rows
1025 );
1026 progress_table.set_progress(
1027 vnode,
1028 Some(
1029 main_kv
1030 .row()
1031 .project(main_table.pk_indices())
1032 .to_owned_row(),
1033 ),
1034 false,
1035 processed_rows,
1036 )?;
1037 main_item = iter_main.next().await.transpose()?;
1038 }
1039
1040 if let Some(current_entry) = progress_table.get_progress(vnode) {
1042 progress_table.set_progress(
1043 vnode,
1044 current_entry.current_pos.clone(),
1045 true, current_entry.processed_rows,
1047 )?;
1048
1049 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1050 }
1051 }
1052
1053 yield None;
1055 }
1056
1057 fn may_update_depended_subscriptions(
1059 depended_subscriptions: &mut HashSet<SubscriberId>,
1060 barrier: &Barrier,
1061 mv_table_id: TableId,
1062 ) {
1063 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1064 if !depended_subscriptions.insert(subscriber_id) {
1065 warn!(
1066 ?depended_subscriptions,
1067 %mv_table_id,
1068 %subscriber_id,
1069 "subscription id already exists"
1070 );
1071 }
1072 }
1073
1074 if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1075 for SubscriptionUpstreamInfo {
1076 subscriber_id,
1077 upstream_mv_table_id,
1078 } in subscriptions_to_drop
1079 {
1080 if *upstream_mv_table_id == mv_table_id
1081 && !depended_subscriptions.remove(subscriber_id)
1082 {
1083 warn!(
1084 ?depended_subscriptions,
1085 %mv_table_id,
1086 %subscriber_id,
1087 "drop non existing subscriber_id id"
1088 );
1089 }
1090 }
1091 }
1092 }
1093
1094 fn init_refresh_progress(
1096 state_table: &StateTableInner<S, SD>,
1097 progress_table: &mut RefreshProgressTable<S>,
1098 _epoch: u64,
1099 ) -> StreamExecutorResult<()> {
1100 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1101
1102 for vnode in state_table.vnodes().iter_vnodes() {
1104 progress_table.set_progress(
1105 vnode, None, false, 0, )?;
1109 }
1110
1111 tracing::info!(
1112 vnodes_count = state_table.vnodes().count_ones(),
1113 "Initialized refresh progress tracking for all VNodes"
1114 );
1115
1116 Ok(())
1117 }
1118}
1119
1120impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1121 #[cfg(any(test, feature = "test"))]
1123 pub async fn for_test(
1124 input: Executor,
1125 store: S,
1126 table_id: TableId,
1127 keys: Vec<ColumnOrder>,
1128 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1129 watermark_epoch: AtomicU64Ref,
1130 conflict_behavior: ConflictBehavior,
1131 ) -> Self {
1132 Self::for_test_inner(
1133 input,
1134 store,
1135 table_id,
1136 keys,
1137 column_ids,
1138 watermark_epoch,
1139 conflict_behavior,
1140 None,
1141 )
1142 .await
1143 }
1144
1145 #[cfg(any(test, feature = "test"))]
1146 #[expect(clippy::too_many_arguments)]
1147 pub async fn for_test_with_stream_key(
1148 input: Executor,
1149 store: S,
1150 table_id: TableId,
1151 keys: Vec<ColumnOrder>,
1152 stream_key: Vec<usize>,
1153 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1154 watermark_epoch: AtomicU64Ref,
1155 conflict_behavior: ConflictBehavior,
1156 ) -> Self {
1157 Self::for_test_inner(
1158 input,
1159 store,
1160 table_id,
1161 keys,
1162 column_ids,
1163 watermark_epoch,
1164 conflict_behavior,
1165 Some(stream_key),
1166 )
1167 .await
1168 }
1169
1170 #[cfg(any(test, feature = "test"))]
1171 #[expect(clippy::too_many_arguments)]
1172 async fn for_test_inner(
1173 input: Executor,
1174 store: S,
1175 table_id: TableId,
1176 keys: Vec<ColumnOrder>,
1177 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1178 watermark_epoch: AtomicU64Ref,
1179 conflict_behavior: ConflictBehavior,
1180 stream_key: Option<Vec<usize>>,
1181 ) -> Self {
1182 use risingwave_common::util::iter_util::ZipEqFast;
1183
1184 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1185 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1186 let schema = input.schema().clone();
1187 let columns: Vec<ColumnDesc> = column_ids
1188 .into_iter()
1189 .zip_eq_fast(schema.fields.iter())
1190 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1191 .collect_vec();
1192
1193 let row_serde = BasicSerde::new(
1194 Arc::from((0..columns.len()).collect_vec()),
1195 Arc::from(columns.clone().into_boxed_slice()),
1196 );
1197 let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1198 let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1199 table_id,
1200 columns,
1201 arrange_order_types,
1202 arrange_columns.clone(),
1203 0,
1204 );
1205 table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1206 let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1207
1208 let unused = StreamingMetrics::unused();
1209 let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1210 let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1211
1212 Self {
1213 input,
1214 schema,
1215 state_table,
1216 stream_key_indices,
1217 arrange_key_indices: arrange_columns.clone(),
1218 actor_context: ActorContext::for_test(0),
1219 materialize_cache: MaterializeCache::new(
1220 watermark_epoch,
1221 MetricsInfo::for_test(),
1222 row_serde,
1223 vec![],
1224 conflict_behavior,
1225 None,
1226 cache_metrics,
1227 ),
1228 toastable_column_indices: None,
1229 conflict_behavior,
1230 cleaned_by_ttl_watermark: false,
1231 version_column_indices: vec![],
1232 is_dummy_table: false,
1233 may_have_downstream: true,
1234 subscriber_ids: HashSet::new(),
1235 metrics,
1236 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1238 }
1239 }
1240}
1241
1242impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1243 fn execute(self: Box<Self>) -> BoxedMessageStream {
1244 self.execute_inner().boxed()
1245 }
1246}
1247
1248impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250 f.debug_struct("MaterializeExecutor")
1251 .field("arrange_key_indices", &self.arrange_key_indices)
1252 .field("stream_key_indices", &self.stream_key_indices)
1253 .finish()
1254 }
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259
1260 use std::iter;
1261 use std::sync::atomic::AtomicU64;
1262
1263 use rand::rngs::SmallRng;
1264 use rand::{Rng, RngCore, SeedableRng};
1265 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1266 use risingwave_common::catalog::Field;
1267 use risingwave_common::util::epoch::test_epoch;
1268 use risingwave_common::util::sort_util::OrderType;
1269 use risingwave_hummock_sdk::HummockReadEpoch;
1270 use risingwave_storage::memory::MemoryStateStore;
1271 use risingwave_storage::table::batch_table::BatchTable;
1272
1273 use super::*;
1274 use crate::executor::test_utils::*;
1275
1276 #[tokio::test]
1277 async fn test_materialize_executor() {
1278 let memory_state_store = MemoryStateStore::new();
1280 let table_id = TableId::new(1);
1281 let schema = Schema::new(vec![
1283 Field::unnamed(DataType::Int32),
1284 Field::unnamed(DataType::Int32),
1285 ]);
1286 let column_ids = vec![0.into(), 1.into()];
1287
1288 let chunk1 = StreamChunk::from_pretty(
1290 " i i
1291 + 1 4
1292 + 2 5
1293 + 3 6",
1294 );
1295 let chunk2 = StreamChunk::from_pretty(
1296 " i i
1297 + 7 8
1298 - 3 6",
1299 );
1300
1301 let source = MockSource::with_messages(vec![
1303 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1304 Message::Chunk(chunk1),
1305 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1306 Message::Chunk(chunk2),
1307 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1308 ])
1309 .into_executor(schema.clone(), StreamKey::new());
1310
1311 let order_types = vec![OrderType::ascending()];
1312 let column_descs = vec![
1313 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1314 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1315 ];
1316
1317 let table = BatchTable::for_test(
1318 memory_state_store.clone(),
1319 table_id,
1320 column_descs,
1321 order_types,
1322 vec![0],
1323 vec![0, 1],
1324 );
1325
1326 let mut materialize_executor = MaterializeExecutor::for_test(
1327 source,
1328 memory_state_store,
1329 table_id,
1330 vec![ColumnOrder::new(0, OrderType::ascending())],
1331 column_ids,
1332 Arc::new(AtomicU64::new(0)),
1333 ConflictBehavior::NoCheck,
1334 )
1335 .await
1336 .boxed()
1337 .execute();
1338 materialize_executor.next().await.transpose().unwrap();
1339
1340 materialize_executor.next().await.transpose().unwrap();
1341
1342 match materialize_executor.next().await.transpose().unwrap() {
1344 Some(Message::Barrier(_)) => {
1345 let row = table
1346 .get_row(
1347 &OwnedRow::new(vec![Some(3_i32.into())]),
1348 HummockReadEpoch::NoWait(u64::MAX),
1349 )
1350 .await
1351 .unwrap();
1352 assert_eq!(
1353 row,
1354 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1355 );
1356 }
1357 _ => unreachable!(),
1358 }
1359 materialize_executor.next().await.transpose().unwrap();
1360 match materialize_executor.next().await.transpose().unwrap() {
1362 Some(Message::Barrier(_)) => {
1363 let row = table
1364 .get_row(
1365 &OwnedRow::new(vec![Some(7_i32.into())]),
1366 HummockReadEpoch::NoWait(u64::MAX),
1367 )
1368 .await
1369 .unwrap();
1370 assert_eq!(
1371 row,
1372 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1373 );
1374 }
1375 _ => unreachable!(),
1376 }
1377 }
1378
1379 #[tokio::test]
1381 async fn test_upsert_stream() {
1382 let memory_state_store = MemoryStateStore::new();
1384 let table_id = TableId::new(1);
1385 let schema = Schema::new(vec![
1387 Field::unnamed(DataType::Int32),
1388 Field::unnamed(DataType::Int32),
1389 ]);
1390 let column_ids = vec![0.into(), 1.into()];
1391
1392 let chunk1 = StreamChunk::from_pretty(
1394 " i i
1395 + 1 1",
1396 );
1397
1398 let chunk2 = StreamChunk::from_pretty(
1399 " i i
1400 + 1 2
1401 - 1 2",
1402 );
1403
1404 let source = MockSource::with_messages(vec![
1406 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1407 Message::Chunk(chunk1),
1408 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1409 Message::Chunk(chunk2),
1410 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1411 ])
1412 .into_executor(schema.clone(), StreamKey::new());
1413
1414 let order_types = vec![OrderType::ascending()];
1415 let column_descs = vec![
1416 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1417 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1418 ];
1419
1420 let table = BatchTable::for_test(
1421 memory_state_store.clone(),
1422 table_id,
1423 column_descs,
1424 order_types,
1425 vec![0],
1426 vec![0, 1],
1427 );
1428
1429 let mut materialize_executor = MaterializeExecutor::for_test(
1430 source,
1431 memory_state_store,
1432 table_id,
1433 vec![ColumnOrder::new(0, OrderType::ascending())],
1434 column_ids,
1435 Arc::new(AtomicU64::new(0)),
1436 ConflictBehavior::Overwrite,
1437 )
1438 .await
1439 .boxed()
1440 .execute();
1441 materialize_executor.next().await.transpose().unwrap();
1442
1443 materialize_executor.next().await.transpose().unwrap();
1444 materialize_executor.next().await.transpose().unwrap();
1445 materialize_executor.next().await.transpose().unwrap();
1446
1447 match materialize_executor.next().await.transpose().unwrap() {
1448 Some(Message::Barrier(_)) => {
1449 let row = table
1450 .get_row(
1451 &OwnedRow::new(vec![Some(1_i32.into())]),
1452 HummockReadEpoch::NoWait(u64::MAX),
1453 )
1454 .await
1455 .unwrap();
1456 assert!(row.is_none());
1457 }
1458 _ => unreachable!(),
1459 }
1460 }
1461
1462 #[tokio::test]
1463 async fn test_check_insert_conflict() {
1464 let memory_state_store = MemoryStateStore::new();
1466 let table_id = TableId::new(1);
1467 let schema = Schema::new(vec![
1469 Field::unnamed(DataType::Int32),
1470 Field::unnamed(DataType::Int32),
1471 ]);
1472 let column_ids = vec![0.into(), 1.into()];
1473
1474 let chunk1 = StreamChunk::from_pretty(
1476 " i i
1477 + 1 3
1478 + 1 4
1479 + 2 5
1480 + 3 6",
1481 );
1482
1483 let chunk2 = StreamChunk::from_pretty(
1484 " i i
1485 + 1 3
1486 + 2 6",
1487 );
1488
1489 let chunk3 = StreamChunk::from_pretty(
1491 " i i
1492 + 1 4",
1493 );
1494
1495 let source = MockSource::with_messages(vec![
1497 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1498 Message::Chunk(chunk1),
1499 Message::Chunk(chunk2),
1500 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1501 Message::Chunk(chunk3),
1502 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1503 ])
1504 .into_executor(schema.clone(), StreamKey::new());
1505
1506 let order_types = vec![OrderType::ascending()];
1507 let column_descs = vec![
1508 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1509 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1510 ];
1511
1512 let table = BatchTable::for_test(
1513 memory_state_store.clone(),
1514 table_id,
1515 column_descs,
1516 order_types,
1517 vec![0],
1518 vec![0, 1],
1519 );
1520
1521 let mut materialize_executor = MaterializeExecutor::for_test(
1522 source,
1523 memory_state_store,
1524 table_id,
1525 vec![ColumnOrder::new(0, OrderType::ascending())],
1526 column_ids,
1527 Arc::new(AtomicU64::new(0)),
1528 ConflictBehavior::Overwrite,
1529 )
1530 .await
1531 .boxed()
1532 .execute();
1533 materialize_executor.next().await.transpose().unwrap();
1534
1535 materialize_executor.next().await.transpose().unwrap();
1536 materialize_executor.next().await.transpose().unwrap();
1537
1538 match materialize_executor.next().await.transpose().unwrap() {
1540 Some(Message::Barrier(_)) => {
1541 let row = table
1542 .get_row(
1543 &OwnedRow::new(vec![Some(3_i32.into())]),
1544 HummockReadEpoch::NoWait(u64::MAX),
1545 )
1546 .await
1547 .unwrap();
1548 assert_eq!(
1549 row,
1550 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1551 );
1552
1553 let row = table
1554 .get_row(
1555 &OwnedRow::new(vec![Some(1_i32.into())]),
1556 HummockReadEpoch::NoWait(u64::MAX),
1557 )
1558 .await
1559 .unwrap();
1560 assert_eq!(
1561 row,
1562 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1563 );
1564
1565 let row = table
1566 .get_row(
1567 &OwnedRow::new(vec![Some(2_i32.into())]),
1568 HummockReadEpoch::NoWait(u64::MAX),
1569 )
1570 .await
1571 .unwrap();
1572 assert_eq!(
1573 row,
1574 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1575 );
1576 }
1577 _ => unreachable!(),
1578 }
1579 }
1580
1581 #[tokio::test]
1582 async fn test_delete_and_update_conflict() {
1583 let memory_state_store = MemoryStateStore::new();
1585 let table_id = TableId::new(1);
1586 let schema = Schema::new(vec![
1588 Field::unnamed(DataType::Int32),
1589 Field::unnamed(DataType::Int32),
1590 ]);
1591 let column_ids = vec![0.into(), 1.into()];
1592
1593 let chunk1 = StreamChunk::from_pretty(
1595 " i i
1596 + 1 4
1597 + 2 5
1598 + 3 6
1599 U- 8 1
1600 U+ 8 2
1601 + 8 3",
1602 );
1603
1604 let chunk2 = StreamChunk::from_pretty(
1606 " i i
1607 + 7 8
1608 - 3 4
1609 - 5 0",
1610 );
1611
1612 let chunk3 = StreamChunk::from_pretty(
1614 " i i
1615 + 1 5
1616 U- 2 4
1617 U+ 2 8
1618 U- 9 0
1619 U+ 9 1",
1620 );
1621
1622 let source = MockSource::with_messages(vec![
1624 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1625 Message::Chunk(chunk1),
1626 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1627 Message::Chunk(chunk2),
1628 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1629 Message::Chunk(chunk3),
1630 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1631 ])
1632 .into_executor(schema.clone(), StreamKey::new());
1633
1634 let order_types = vec![OrderType::ascending()];
1635 let column_descs = vec![
1636 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1637 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1638 ];
1639
1640 let table = BatchTable::for_test(
1641 memory_state_store.clone(),
1642 table_id,
1643 column_descs,
1644 order_types,
1645 vec![0],
1646 vec![0, 1],
1647 );
1648
1649 let mut materialize_executor = MaterializeExecutor::for_test(
1650 source,
1651 memory_state_store,
1652 table_id,
1653 vec![ColumnOrder::new(0, OrderType::ascending())],
1654 column_ids,
1655 Arc::new(AtomicU64::new(0)),
1656 ConflictBehavior::Overwrite,
1657 )
1658 .await
1659 .boxed()
1660 .execute();
1661 materialize_executor.next().await.transpose().unwrap();
1662
1663 materialize_executor.next().await.transpose().unwrap();
1664
1665 match materialize_executor.next().await.transpose().unwrap() {
1667 Some(Message::Barrier(_)) => {
1668 let row = table
1670 .get_row(
1671 &OwnedRow::new(vec![Some(8_i32.into())]),
1672 HummockReadEpoch::NoWait(u64::MAX),
1673 )
1674 .await
1675 .unwrap();
1676 assert_eq!(
1677 row,
1678 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1679 );
1680 }
1681 _ => unreachable!(),
1682 }
1683 materialize_executor.next().await.transpose().unwrap();
1684
1685 match materialize_executor.next().await.transpose().unwrap() {
1686 Some(Message::Barrier(_)) => {
1687 let row = table
1688 .get_row(
1689 &OwnedRow::new(vec![Some(7_i32.into())]),
1690 HummockReadEpoch::NoWait(u64::MAX),
1691 )
1692 .await
1693 .unwrap();
1694 assert_eq!(
1695 row,
1696 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1697 );
1698
1699 let row = table
1701 .get_row(
1702 &OwnedRow::new(vec![Some(3_i32.into())]),
1703 HummockReadEpoch::NoWait(u64::MAX),
1704 )
1705 .await
1706 .unwrap();
1707 assert_eq!(row, None);
1708
1709 let row = table
1711 .get_row(
1712 &OwnedRow::new(vec![Some(5_i32.into())]),
1713 HummockReadEpoch::NoWait(u64::MAX),
1714 )
1715 .await
1716 .unwrap();
1717 assert_eq!(row, None);
1718 }
1719 _ => unreachable!(),
1720 }
1721
1722 materialize_executor.next().await.transpose().unwrap();
1723 match materialize_executor.next().await.transpose().unwrap() {
1725 Some(Message::Barrier(_)) => {
1726 let row = table
1727 .get_row(
1728 &OwnedRow::new(vec![Some(1_i32.into())]),
1729 HummockReadEpoch::NoWait(u64::MAX),
1730 )
1731 .await
1732 .unwrap();
1733 assert_eq!(
1734 row,
1735 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1736 );
1737
1738 let row = table
1740 .get_row(
1741 &OwnedRow::new(vec![Some(2_i32.into())]),
1742 HummockReadEpoch::NoWait(u64::MAX),
1743 )
1744 .await
1745 .unwrap();
1746 assert_eq!(
1747 row,
1748 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1749 );
1750
1751 let row = table
1753 .get_row(
1754 &OwnedRow::new(vec![Some(9_i32.into())]),
1755 HummockReadEpoch::NoWait(u64::MAX),
1756 )
1757 .await
1758 .unwrap();
1759 assert_eq!(
1760 row,
1761 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1762 );
1763 }
1764 _ => unreachable!(),
1765 }
1766 }
1767
1768 #[tokio::test]
1769 async fn test_change_buffer_into_chunk_with_stream_key() {
1770 let memory_state_store = MemoryStateStore::new();
1773 let table_id = TableId::new(1);
1774 let schema = Schema::new(vec![
1775 Field::unnamed(DataType::Int32),
1776 Field::unnamed(DataType::Int32),
1777 ]);
1778 let column_ids = vec![0.into(), 1.into()];
1779
1780 let chunk1 = StreamChunk::from_pretty(
1781 " i i
1782 + 1 4",
1783 );
1784 let chunk2 = StreamChunk::from_pretty(
1785 " i i
1786 + 1 5",
1787 );
1788
1789 let source = MockSource::with_messages(vec![
1790 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1791 Message::Chunk(chunk1),
1792 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1793 Message::Chunk(chunk2),
1794 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1795 ])
1796 .into_executor(schema.clone(), StreamKey::new());
1797
1798 let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1799 source,
1800 memory_state_store,
1801 table_id,
1802 vec![ColumnOrder::new(0, OrderType::ascending())],
1803 vec![0, 1],
1804 column_ids,
1805 Arc::new(AtomicU64::new(0)),
1806 ConflictBehavior::Overwrite,
1807 )
1808 .await
1809 .boxed()
1810 .execute();
1811
1812 materialize_executor.next().await.transpose().unwrap();
1814 materialize_executor.next().await.transpose().unwrap();
1815
1816 materialize_executor.next().await.transpose().unwrap();
1818
1819 match materialize_executor.next().await.transpose().unwrap() {
1821 Some(Message::Chunk(chunk)) => {
1822 assert_eq!(
1823 chunk.compact_vis(),
1824 StreamChunk::from_pretty(
1825 " i i
1826 - 1 4
1827 + 1 5"
1828 )
1829 );
1830 }
1831 other => panic!("expect chunk, got {other:?}"),
1832 }
1833 }
1834
1835 #[tokio::test]
1836 async fn test_ignore_insert_conflict() {
1837 let memory_state_store = MemoryStateStore::new();
1839 let table_id = TableId::new(1);
1840 let schema = Schema::new(vec![
1842 Field::unnamed(DataType::Int32),
1843 Field::unnamed(DataType::Int32),
1844 ]);
1845 let column_ids = vec![0.into(), 1.into()];
1846
1847 let chunk1 = StreamChunk::from_pretty(
1849 " i i
1850 + 1 3
1851 + 1 4
1852 + 2 5
1853 + 3 6",
1854 );
1855
1856 let chunk2 = StreamChunk::from_pretty(
1857 " i i
1858 + 1 5
1859 + 2 6",
1860 );
1861
1862 let chunk3 = StreamChunk::from_pretty(
1864 " i i
1865 + 1 6",
1866 );
1867
1868 let source = MockSource::with_messages(vec![
1870 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1871 Message::Chunk(chunk1),
1872 Message::Chunk(chunk2),
1873 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1874 Message::Chunk(chunk3),
1875 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1876 ])
1877 .into_executor(schema.clone(), StreamKey::new());
1878
1879 let order_types = vec![OrderType::ascending()];
1880 let column_descs = vec![
1881 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1882 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1883 ];
1884
1885 let table = BatchTable::for_test(
1886 memory_state_store.clone(),
1887 table_id,
1888 column_descs,
1889 order_types,
1890 vec![0],
1891 vec![0, 1],
1892 );
1893
1894 let mut materialize_executor = MaterializeExecutor::for_test(
1895 source,
1896 memory_state_store,
1897 table_id,
1898 vec![ColumnOrder::new(0, OrderType::ascending())],
1899 column_ids,
1900 Arc::new(AtomicU64::new(0)),
1901 ConflictBehavior::IgnoreConflict,
1902 )
1903 .await
1904 .boxed()
1905 .execute();
1906 materialize_executor.next().await.transpose().unwrap();
1907
1908 materialize_executor.next().await.transpose().unwrap();
1909 materialize_executor.next().await.transpose().unwrap();
1910
1911 match materialize_executor.next().await.transpose().unwrap() {
1913 Some(Message::Barrier(_)) => {
1914 let row = table
1915 .get_row(
1916 &OwnedRow::new(vec![Some(3_i32.into())]),
1917 HummockReadEpoch::NoWait(u64::MAX),
1918 )
1919 .await
1920 .unwrap();
1921 assert_eq!(
1922 row,
1923 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1924 );
1925
1926 let row = table
1927 .get_row(
1928 &OwnedRow::new(vec![Some(1_i32.into())]),
1929 HummockReadEpoch::NoWait(u64::MAX),
1930 )
1931 .await
1932 .unwrap();
1933 assert_eq!(
1934 row,
1935 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1936 );
1937
1938 let row = table
1939 .get_row(
1940 &OwnedRow::new(vec![Some(2_i32.into())]),
1941 HummockReadEpoch::NoWait(u64::MAX),
1942 )
1943 .await
1944 .unwrap();
1945 assert_eq!(
1946 row,
1947 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1948 );
1949 }
1950 _ => unreachable!(),
1951 }
1952 }
1953
1954 #[tokio::test]
1955 async fn test_ignore_delete_then_insert() {
1956 let memory_state_store = MemoryStateStore::new();
1958 let table_id = TableId::new(1);
1959 let schema = Schema::new(vec![
1961 Field::unnamed(DataType::Int32),
1962 Field::unnamed(DataType::Int32),
1963 ]);
1964 let column_ids = vec![0.into(), 1.into()];
1965
1966 let chunk1 = StreamChunk::from_pretty(
1968 " i i
1969 + 1 3
1970 - 1 3
1971 + 1 6",
1972 );
1973
1974 let source = MockSource::with_messages(vec![
1976 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1977 Message::Chunk(chunk1),
1978 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1979 ])
1980 .into_executor(schema.clone(), StreamKey::new());
1981
1982 let order_types = vec![OrderType::ascending()];
1983 let column_descs = vec![
1984 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1985 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1986 ];
1987
1988 let table = BatchTable::for_test(
1989 memory_state_store.clone(),
1990 table_id,
1991 column_descs,
1992 order_types,
1993 vec![0],
1994 vec![0, 1],
1995 );
1996
1997 let mut materialize_executor = MaterializeExecutor::for_test(
1998 source,
1999 memory_state_store,
2000 table_id,
2001 vec![ColumnOrder::new(0, OrderType::ascending())],
2002 column_ids,
2003 Arc::new(AtomicU64::new(0)),
2004 ConflictBehavior::IgnoreConflict,
2005 )
2006 .await
2007 .boxed()
2008 .execute();
2009 let _msg1 = materialize_executor
2010 .next()
2011 .await
2012 .transpose()
2013 .unwrap()
2014 .unwrap()
2015 .as_barrier()
2016 .unwrap();
2017 let _msg2 = materialize_executor
2018 .next()
2019 .await
2020 .transpose()
2021 .unwrap()
2022 .unwrap()
2023 .as_chunk()
2024 .unwrap();
2025 let _msg3 = materialize_executor
2026 .next()
2027 .await
2028 .transpose()
2029 .unwrap()
2030 .unwrap()
2031 .as_barrier()
2032 .unwrap();
2033
2034 let row = table
2035 .get_row(
2036 &OwnedRow::new(vec![Some(1_i32.into())]),
2037 HummockReadEpoch::NoWait(u64::MAX),
2038 )
2039 .await
2040 .unwrap();
2041 assert_eq!(
2042 row,
2043 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2044 );
2045 }
2046
2047 #[tokio::test]
2048 async fn test_ignore_delete_and_update_conflict() {
2049 let memory_state_store = MemoryStateStore::new();
2051 let table_id = TableId::new(1);
2052 let schema = Schema::new(vec![
2054 Field::unnamed(DataType::Int32),
2055 Field::unnamed(DataType::Int32),
2056 ]);
2057 let column_ids = vec![0.into(), 1.into()];
2058
2059 let chunk1 = StreamChunk::from_pretty(
2061 " i i
2062 + 1 4
2063 + 2 5
2064 + 3 6
2065 U- 8 1
2066 U+ 8 2
2067 + 8 3",
2068 );
2069
2070 let chunk2 = StreamChunk::from_pretty(
2072 " i i
2073 + 7 8
2074 - 3 4
2075 - 5 0",
2076 );
2077
2078 let chunk3 = StreamChunk::from_pretty(
2080 " i i
2081 + 1 5
2082 U- 2 4
2083 U+ 2 8
2084 U- 9 0
2085 U+ 9 1",
2086 );
2087
2088 let source = MockSource::with_messages(vec![
2090 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2091 Message::Chunk(chunk1),
2092 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2093 Message::Chunk(chunk2),
2094 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2095 Message::Chunk(chunk3),
2096 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2097 ])
2098 .into_executor(schema.clone(), StreamKey::new());
2099
2100 let order_types = vec![OrderType::ascending()];
2101 let column_descs = vec![
2102 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2103 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2104 ];
2105
2106 let table = BatchTable::for_test(
2107 memory_state_store.clone(),
2108 table_id,
2109 column_descs,
2110 order_types,
2111 vec![0],
2112 vec![0, 1],
2113 );
2114
2115 let mut materialize_executor = MaterializeExecutor::for_test(
2116 source,
2117 memory_state_store,
2118 table_id,
2119 vec![ColumnOrder::new(0, OrderType::ascending())],
2120 column_ids,
2121 Arc::new(AtomicU64::new(0)),
2122 ConflictBehavior::IgnoreConflict,
2123 )
2124 .await
2125 .boxed()
2126 .execute();
2127 materialize_executor.next().await.transpose().unwrap();
2128
2129 materialize_executor.next().await.transpose().unwrap();
2130
2131 match materialize_executor.next().await.transpose().unwrap() {
2133 Some(Message::Barrier(_)) => {
2134 let row = table
2136 .get_row(
2137 &OwnedRow::new(vec![Some(8_i32.into())]),
2138 HummockReadEpoch::NoWait(u64::MAX),
2139 )
2140 .await
2141 .unwrap();
2142 assert_eq!(
2143 row,
2144 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2145 );
2146 }
2147 _ => unreachable!(),
2148 }
2149 materialize_executor.next().await.transpose().unwrap();
2150
2151 match materialize_executor.next().await.transpose().unwrap() {
2152 Some(Message::Barrier(_)) => {
2153 let row = table
2154 .get_row(
2155 &OwnedRow::new(vec![Some(7_i32.into())]),
2156 HummockReadEpoch::NoWait(u64::MAX),
2157 )
2158 .await
2159 .unwrap();
2160 assert_eq!(
2161 row,
2162 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2163 );
2164
2165 let row = table
2167 .get_row(
2168 &OwnedRow::new(vec![Some(3_i32.into())]),
2169 HummockReadEpoch::NoWait(u64::MAX),
2170 )
2171 .await
2172 .unwrap();
2173 assert_eq!(row, None);
2174
2175 let row = table
2177 .get_row(
2178 &OwnedRow::new(vec![Some(5_i32.into())]),
2179 HummockReadEpoch::NoWait(u64::MAX),
2180 )
2181 .await
2182 .unwrap();
2183 assert_eq!(row, None);
2184 }
2185 _ => unreachable!(),
2186 }
2187
2188 materialize_executor.next().await.transpose().unwrap();
2189 match materialize_executor.next().await.transpose().unwrap() {
2192 Some(Message::Barrier(_)) => {
2193 let row = table
2194 .get_row(
2195 &OwnedRow::new(vec![Some(1_i32.into())]),
2196 HummockReadEpoch::NoWait(u64::MAX),
2197 )
2198 .await
2199 .unwrap();
2200 assert_eq!(
2201 row,
2202 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2203 );
2204
2205 let row = table
2207 .get_row(
2208 &OwnedRow::new(vec![Some(2_i32.into())]),
2209 HummockReadEpoch::NoWait(u64::MAX),
2210 )
2211 .await
2212 .unwrap();
2213 assert_eq!(
2214 row,
2215 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2216 );
2217
2218 let row = table
2220 .get_row(
2221 &OwnedRow::new(vec![Some(9_i32.into())]),
2222 HummockReadEpoch::NoWait(u64::MAX),
2223 )
2224 .await
2225 .unwrap();
2226 assert_eq!(
2227 row,
2228 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2229 );
2230 }
2231 _ => unreachable!(),
2232 }
2233 }
2234
2235 #[tokio::test]
2236 async fn test_do_update_if_not_null_conflict() {
2237 let memory_state_store = MemoryStateStore::new();
2239 let table_id = TableId::new(1);
2240 let schema = Schema::new(vec![
2242 Field::unnamed(DataType::Int32),
2243 Field::unnamed(DataType::Int32),
2244 ]);
2245 let column_ids = vec![0.into(), 1.into()];
2246
2247 let chunk1 = StreamChunk::from_pretty(
2249 " i i
2250 + 1 4
2251 + 2 .
2252 + 3 6
2253 U- 8 .
2254 U+ 8 2
2255 + 8 .",
2256 );
2257
2258 let chunk2 = StreamChunk::from_pretty(
2260 " i i
2261 + 7 8
2262 - 3 4
2263 - 5 0",
2264 );
2265
2266 let chunk3 = StreamChunk::from_pretty(
2268 " i i
2269 + 1 5
2270 + 7 .
2271 U- 2 4
2272 U+ 2 .
2273 U- 9 0
2274 U+ 9 1",
2275 );
2276
2277 let source = MockSource::with_messages(vec![
2279 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2280 Message::Chunk(chunk1),
2281 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2282 Message::Chunk(chunk2),
2283 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2284 Message::Chunk(chunk3),
2285 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2286 ])
2287 .into_executor(schema.clone(), StreamKey::new());
2288
2289 let order_types = vec![OrderType::ascending()];
2290 let column_descs = vec![
2291 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2292 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2293 ];
2294
2295 let table = BatchTable::for_test(
2296 memory_state_store.clone(),
2297 table_id,
2298 column_descs,
2299 order_types,
2300 vec![0],
2301 vec![0, 1],
2302 );
2303
2304 let mut materialize_executor = MaterializeExecutor::for_test(
2305 source,
2306 memory_state_store,
2307 table_id,
2308 vec![ColumnOrder::new(0, OrderType::ascending())],
2309 column_ids,
2310 Arc::new(AtomicU64::new(0)),
2311 ConflictBehavior::DoUpdateIfNotNull,
2312 )
2313 .await
2314 .boxed()
2315 .execute();
2316 materialize_executor.next().await.transpose().unwrap();
2317
2318 materialize_executor.next().await.transpose().unwrap();
2319
2320 match materialize_executor.next().await.transpose().unwrap() {
2322 Some(Message::Barrier(_)) => {
2323 let row = table
2324 .get_row(
2325 &OwnedRow::new(vec![Some(8_i32.into())]),
2326 HummockReadEpoch::NoWait(u64::MAX),
2327 )
2328 .await
2329 .unwrap();
2330 assert_eq!(
2331 row,
2332 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2333 );
2334
2335 let row = table
2336 .get_row(
2337 &OwnedRow::new(vec![Some(2_i32.into())]),
2338 HummockReadEpoch::NoWait(u64::MAX),
2339 )
2340 .await
2341 .unwrap();
2342 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2343 }
2344 _ => unreachable!(),
2345 }
2346 materialize_executor.next().await.transpose().unwrap();
2347
2348 match materialize_executor.next().await.transpose().unwrap() {
2349 Some(Message::Barrier(_)) => {
2350 let row = table
2351 .get_row(
2352 &OwnedRow::new(vec![Some(7_i32.into())]),
2353 HummockReadEpoch::NoWait(u64::MAX),
2354 )
2355 .await
2356 .unwrap();
2357 assert_eq!(
2358 row,
2359 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2360 );
2361
2362 let row = table
2364 .get_row(
2365 &OwnedRow::new(vec![Some(3_i32.into())]),
2366 HummockReadEpoch::NoWait(u64::MAX),
2367 )
2368 .await
2369 .unwrap();
2370 assert_eq!(row, None);
2371
2372 let row = table
2374 .get_row(
2375 &OwnedRow::new(vec![Some(5_i32.into())]),
2376 HummockReadEpoch::NoWait(u64::MAX),
2377 )
2378 .await
2379 .unwrap();
2380 assert_eq!(row, None);
2381 }
2382 _ => unreachable!(),
2383 }
2384
2385 materialize_executor.next().await.transpose().unwrap();
2386 match materialize_executor.next().await.transpose().unwrap() {
2389 Some(Message::Barrier(_)) => {
2390 let row = table
2391 .get_row(
2392 &OwnedRow::new(vec![Some(7_i32.into())]),
2393 HummockReadEpoch::NoWait(u64::MAX),
2394 )
2395 .await
2396 .unwrap();
2397 assert_eq!(
2398 row,
2399 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2400 );
2401
2402 let row = table
2404 .get_row(
2405 &OwnedRow::new(vec![Some(2_i32.into())]),
2406 HummockReadEpoch::NoWait(u64::MAX),
2407 )
2408 .await
2409 .unwrap();
2410 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2411
2412 let row = table
2414 .get_row(
2415 &OwnedRow::new(vec![Some(9_i32.into())]),
2416 HummockReadEpoch::NoWait(u64::MAX),
2417 )
2418 .await
2419 .unwrap();
2420 assert_eq!(
2421 row,
2422 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2423 );
2424 }
2425 _ => unreachable!(),
2426 }
2427 }
2428
2429 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2430 const KN: u32 = 4;
2431 const SEED: u64 = 998244353;
2432 let mut ret = vec![];
2433 let mut builder =
2434 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2435 let mut rng = SmallRng::seed_from_u64(SEED);
2436
2437 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2438 let len = c.data_chunk().capacity();
2439 let mut c = StreamChunkMut::from(c);
2440 for i in 0..len {
2441 c.set_vis(i, rng.random_bool(0.5));
2442 }
2443 c.into()
2444 };
2445 for _ in 0..row_number {
2446 let k = (rng.next_u32() % KN) as i32;
2447 let v = rng.next_u32() as i32;
2448 let op = if rng.random_bool(0.5) {
2449 Op::Insert
2450 } else {
2451 Op::Delete
2452 };
2453 if let Some(c) =
2454 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2455 {
2456 ret.push(random_vis(c, &mut rng));
2457 }
2458 }
2459 if let Some(c) = builder.take() {
2460 ret.push(random_vis(c, &mut rng));
2461 }
2462 ret
2463 }
2464
2465 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2466 const N: usize = 100000;
2467
2468 let memory_state_store = MemoryStateStore::new();
2470 let table_id = TableId::new(1);
2471 let schema = Schema::new(vec![
2473 Field::unnamed(DataType::Int32),
2474 Field::unnamed(DataType::Int32),
2475 ]);
2476 let column_ids = vec![0.into(), 1.into()];
2477
2478 let chunks = gen_fuzz_data(N, 128);
2479 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2480 .chain(chunks.into_iter().map(Message::Chunk))
2481 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2482 test_epoch(2),
2483 ))))
2484 .collect();
2485 let source =
2487 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2488
2489 let mut materialize_executor = MaterializeExecutor::for_test(
2490 source,
2491 memory_state_store.clone(),
2492 table_id,
2493 vec![ColumnOrder::new(0, OrderType::ascending())],
2494 column_ids,
2495 Arc::new(AtomicU64::new(0)),
2496 conflict_behavior,
2497 )
2498 .await
2499 .boxed()
2500 .execute();
2501 materialize_executor.expect_barrier().await;
2502
2503 let order_types = vec![OrderType::ascending()];
2504 let column_descs = vec![
2505 ColumnDesc::unnamed(0.into(), DataType::Int32),
2506 ColumnDesc::unnamed(1.into(), DataType::Int32),
2507 ];
2508 let pk_indices = vec![0];
2509
2510 let mut table = StateTable::from_table_catalog(
2511 &crate::common::table::test_utils::gen_pbtable(
2512 TableId::from(1002),
2513 column_descs.clone(),
2514 order_types,
2515 pk_indices,
2516 0,
2517 ),
2518 memory_state_store.clone(),
2519 None,
2520 )
2521 .await;
2522
2523 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2524 table.write_chunk(c);
2526 }
2527 }
2528
2529 #[tokio::test]
2530 async fn fuzz_test_stream_consistent_upsert() {
2531 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2532 }
2533
2534 #[tokio::test]
2535 async fn fuzz_test_stream_consistent_ignore() {
2536 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2537 }
2538}