1use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
39use risingwave_storage::table::KeyedRow;
40
41use crate::common::change_buffer::output_kind as cb_kind;
42use crate::common::metrics::MetricsInfo;
43use crate::common::table::state_table::{
44 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
45};
46use crate::executor::error::ErrorKind;
47use crate::executor::monitor::MaterializeMetrics;
48use crate::executor::mview::RefreshProgressTable;
49use crate::executor::mview::cache::MaterializeCache;
50use crate::executor::prelude::*;
51use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
52use crate::task::LocalBarrierManager;
53
54#[derive(Debug, Clone)]
55pub enum MaterializeStreamState<M> {
56 NormalIngestion,
57 MergingData,
58 CleanUp,
59 CommitAndYieldBarrier {
60 barrier: BarrierInner<M>,
61 expect_next_state: Box<MaterializeStreamState<M>>,
62 },
63 RefreshEnd {
64 on_complete_epoch: EpochPair,
65 },
66}
67
68pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
70 input: Executor,
71
72 schema: Schema,
73
74 state_table: StateTableInner<S, SD>,
75
76 arrange_key_indices: Vec<usize>,
78
79 actor_context: ActorContextRef,
80
81 materialize_cache: Option<MaterializeCache>,
83
84 conflict_behavior: ConflictBehavior,
85
86 version_column_indices: Vec<u32>,
87
88 may_have_downstream: bool,
89
90 subscriber_ids: HashSet<SubscriberId>,
91
92 metrics: MaterializeMetrics,
93
94 is_dummy_table: bool,
97
98 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
100
101 local_barrier_manager: LocalBarrierManager,
103}
104
105pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
107 pub table_catalog: Table,
109
110 pub staging_table_catalog: Table,
112
113 pub is_refreshing: bool,
115
116 pub staging_table: StateTableInner<S, SD>,
123
124 pub progress_table: RefreshProgressTable<S>,
126
127 pub table_id: TableId,
129}
130
131impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
132 pub async fn new(
134 store: S,
135 table_catalog: &Table,
136 staging_table_catalog: &Table,
137 progress_state_table: &Table,
138 vnodes: Option<Arc<Bitmap>>,
139 ) -> Self {
140 let table_id = table_catalog.id;
141
142 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
144 staging_table_catalog,
145 store.clone(),
146 vnodes.clone(),
147 )
148 .await;
149
150 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
151 progress_state_table,
152 store,
153 vnodes,
154 )
155 .await;
156
157 let pk_len = table_catalog.pk.len();
159 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
160
161 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
162
163 Self {
164 table_catalog: table_catalog.clone(),
165 staging_table_catalog: staging_table_catalog.clone(),
166 is_refreshing: false,
167 staging_table,
168 progress_table,
169 table_id,
170 }
171 }
172}
173
174fn get_op_consistency_level(
175 conflict_behavior: ConflictBehavior,
176 may_have_downstream: bool,
177 subscriber_ids: &HashSet<SubscriberId>,
178) -> StateTableOpConsistencyLevel {
179 if !subscriber_ids.is_empty() {
180 StateTableOpConsistencyLevel::LogStoreEnabled
181 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
182 StateTableOpConsistencyLevel::Inconsistent
185 } else {
186 StateTableOpConsistencyLevel::ConsistentOldValue
187 }
188}
189
190impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
191 #[allow(clippy::too_many_arguments)]
195 pub async fn new(
196 input: Executor,
197 schema: Schema,
198 store: S,
199 arrange_key: Vec<ColumnOrder>,
200 actor_context: ActorContextRef,
201 vnodes: Option<Arc<Bitmap>>,
202 table_catalog: &Table,
203 watermark_epoch: AtomicU64Ref,
204 conflict_behavior: ConflictBehavior,
205 version_column_indices: Vec<u32>,
206 metrics: Arc<StreamingMetrics>,
207 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
208 local_barrier_manager: LocalBarrierManager,
209 ) -> Self {
210 let table_columns: Vec<ColumnDesc> = table_catalog
211 .columns
212 .iter()
213 .map(|col| col.column_desc.as_ref().unwrap().into())
214 .collect();
215
216 let toastable_column_indices = if table_catalog.cdc_table_type()
219 == risingwave_pb::catalog::table::CdcTableType::Postgres
220 {
221 let toastable_indices: Vec<usize> = table_columns
222 .iter()
223 .enumerate()
224 .filter_map(|(index, column)| match &column.data_type {
225 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
234 Some(index)
235 }
236 _ => None,
237 })
238 .collect();
239
240 if toastable_indices.is_empty() {
241 None
242 } else {
243 Some(toastable_indices)
244 }
245 } else {
246 None
247 };
248
249 let row_serde: BasicSerde = BasicSerde::new(
250 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
251 Arc::from(table_columns.into_boxed_slice()),
252 );
253
254 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
255 let may_have_downstream = actor_context.initial_dispatch_num != 0;
256 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
257 let op_consistency_level =
258 get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
259 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
261 .with_op_consistency_level(op_consistency_level)
262 .enable_preload_all_rows_by_config(&actor_context.config)
263 .build()
264 .await;
265
266 let mv_metrics = metrics.new_materialize_metrics(
267 table_catalog.id,
268 actor_context.id,
269 actor_context.fragment_id,
270 );
271 let cache_metrics = metrics.new_materialize_cache_metrics(
272 table_catalog.id,
273 actor_context.id,
274 actor_context.fragment_id,
275 );
276
277 let metrics_info =
278 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
279
280 let is_dummy_table =
281 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
282
283 Self {
284 input,
285 schema,
286 state_table,
287 arrange_key_indices,
288 actor_context,
289 materialize_cache: MaterializeCache::new(
290 watermark_epoch,
291 metrics_info,
292 row_serde,
293 version_column_indices.clone(),
294 conflict_behavior,
295 toastable_column_indices,
296 cache_metrics,
297 ),
298 conflict_behavior,
299 version_column_indices,
300 is_dummy_table,
301 may_have_downstream,
302 subscriber_ids,
303 metrics: mv_metrics,
304 refresh_args,
305 local_barrier_manager,
306 }
307 }
308
309 #[try_stream(ok = Message, error = StreamExecutorError)]
310 async fn execute_inner(mut self) {
311 let mv_table_id = self.state_table.table_id();
312 let data_types = self.schema.data_types();
313 let mut input = self.input.execute();
314
315 let barrier = expect_first_barrier(&mut input).await?;
316 let first_epoch = barrier.epoch;
317 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
320 self.state_table.init_epoch(first_epoch).await?;
321
322 let mut inner_state =
324 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
325 if let Some(ref mut refresh_args) = self.refresh_args {
327 refresh_args.staging_table.init_epoch(first_epoch).await?;
328
329 refresh_args.progress_table.recover(first_epoch).await?;
331
332 let progress_stats = refresh_args.progress_table.get_progress_stats();
334 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
335 refresh_args.is_refreshing = true;
336 tracing::info!(
337 total_vnodes = progress_stats.total_vnodes,
338 completed_vnodes = progress_stats.completed_vnodes,
339 "Recovered refresh in progress, resuming refresh operation"
340 );
341
342 let incomplete_vnodes: Vec<_> = refresh_args
346 .progress_table
347 .get_all_progress()
348 .iter()
349 .filter(|(_, entry)| !entry.is_completed)
350 .map(|(&vnode, _)| vnode)
351 .collect();
352
353 if !incomplete_vnodes.is_empty() {
354 tracing::info!(
356 incomplete_vnodes = incomplete_vnodes.len(),
357 "Recovery detected incomplete VNodes, resuming refresh operation"
358 );
359 } else {
362 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
364 }
365 }
366 }
367
368 if let Some(ref refresh_args) = self.refresh_args
370 && refresh_args.is_refreshing
371 {
372 let incomplete_vnodes: Vec<_> = refresh_args
374 .progress_table
375 .get_all_progress()
376 .iter()
377 .filter(|(_, entry)| !entry.is_completed)
378 .map(|(&vnode, _)| vnode)
379 .collect();
380 if !incomplete_vnodes.is_empty() {
381 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
383 tracing::info!(
384 incomplete_vnodes = incomplete_vnodes.len(),
385 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
386 );
387 }
388 }
389
390 'main_loop: loop {
392 match *inner_state {
393 MaterializeStreamState::NormalIngestion => {
394 #[for_await]
395 '_normal_ingest: for msg in input.by_ref() {
396 let msg = msg?;
397 if let Some(cache) = &mut self.materialize_cache {
398 cache.evict();
399 }
400
401 match msg {
402 Message::Watermark(w) => {
403 yield Message::Watermark(w);
404 }
405 Message::Chunk(chunk) if self.is_dummy_table => {
406 self.metrics
407 .materialize_input_row_count
408 .inc_by(chunk.cardinality() as u64);
409 yield Message::Chunk(chunk);
410 }
411 Message::Chunk(chunk) => {
412 self.metrics
413 .materialize_input_row_count
414 .inc_by(chunk.cardinality() as u64);
415
416 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
421 self.conflict_behavior
422 && !self.state_table.is_consistent_op()
423 && self.version_column_indices.is_empty()
424 {
425 ConflictBehavior::NoCheck
426 } else {
427 self.conflict_behavior
428 };
429
430 match optimized_conflict_behavior {
431 checked_conflict_behaviors!() => {
432 if chunk.cardinality() == 0 {
433 continue;
435 }
436
437 if let Some(ref mut refresh_args) = self.refresh_args
440 && refresh_args.is_refreshing
441 {
442 let key_chunk = chunk
443 .clone()
444 .project(self.state_table.pk_indices());
445 tracing::trace!(
446 staging_chunk = %key_chunk.to_pretty(),
447 input_chunk = %chunk.to_pretty(),
448 "writing to staging table"
449 );
450 if cfg!(debug_assertions) {
451 assert!(
453 key_chunk
454 .ops()
455 .iter()
456 .all(|op| op == &Op::Insert)
457 );
458 }
459 refresh_args
460 .staging_table
461 .write_chunk(key_chunk.clone());
462 refresh_args.staging_table.try_flush().await?;
463 }
464
465 let cache = self.materialize_cache.as_mut().unwrap();
466 let change_buffer =
467 cache.handle_new(chunk, &self.state_table).await?;
468
469 match change_buffer
470 .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
471 {
472 Some(output_chunk) => {
473 self.state_table.write_chunk(output_chunk.clone());
474 self.state_table.try_flush().await?;
475 yield Message::Chunk(output_chunk);
476 }
477 None => continue,
478 }
479 }
480 ConflictBehavior::NoCheck => {
481 self.state_table.write_chunk(chunk.clone());
482 self.state_table.try_flush().await?;
483
484 if let Some(ref mut refresh_args) = self.refresh_args
486 && refresh_args.is_refreshing
487 {
488 let key_chunk = chunk
489 .clone()
490 .project(self.state_table.pk_indices());
491 tracing::trace!(
492 staging_chunk = %key_chunk.to_pretty(),
493 input_chunk = %chunk.to_pretty(),
494 "writing to staging table"
495 );
496 if cfg!(debug_assertions) {
497 assert!(
499 key_chunk
500 .ops()
501 .iter()
502 .all(|op| op == &Op::Insert)
503 );
504 }
505 refresh_args
506 .staging_table
507 .write_chunk(key_chunk.clone());
508 refresh_args.staging_table.try_flush().await?;
509 }
510
511 yield Message::Chunk(chunk);
512 }
513 }
514 }
515 Message::Barrier(barrier) => {
516 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
517 barrier,
518 expect_next_state: Box::new(
519 MaterializeStreamState::NormalIngestion,
520 ),
521 };
522 continue 'main_loop;
523 }
524 }
525 }
526
527 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
528 anyhow::anyhow!(
529 "Input stream terminated unexpectedly during normal ingestion"
530 ),
531 )));
532 }
533 MaterializeStreamState::MergingData => {
534 let Some(refresh_args) = self.refresh_args.as_mut() else {
535 panic!(
536 "MaterializeExecutor entered CleanUp state without refresh_args configured"
537 );
538 };
539 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
540
541 debug_assert_eq!(
542 self.state_table.vnodes(),
543 refresh_args.staging_table.vnodes()
544 );
545 debug_assert_eq!(
546 refresh_args.staging_table.vnodes(),
547 refresh_args.progress_table.vnodes()
548 );
549
550 let mut rows_to_delete = vec![];
551 let mut merge_complete = false;
552 let mut pending_barrier: Option<Barrier> = None;
553
554 {
556 let left_input = input.by_ref().map(Either::Left);
557 let right_merge_sort = pin!(
558 Self::make_mergesort_stream(
559 &self.state_table,
560 &refresh_args.staging_table,
561 &mut refresh_args.progress_table
562 )
563 .map(Either::Right)
564 );
565
566 let mut merge_stream =
569 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
570 stream::PollNext::Left
571 });
572
573 #[for_await]
574 'merge_stream: for either in &mut merge_stream {
575 match either {
576 Either::Left(msg) => {
577 let msg = msg?;
578 match msg {
579 Message::Watermark(w) => yield Message::Watermark(w),
580 Message::Chunk(chunk) => {
581 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
582 }
583 Message::Barrier(b) => {
584 pending_barrier = Some(b);
585 break 'merge_stream;
586 }
587 }
588 }
589 Either::Right(result) => {
590 match result? {
591 Some((_vnode, row)) => {
592 rows_to_delete.push(row);
593 }
594 None => {
595 merge_complete = true;
597
598 }
600 }
601 }
602 }
603 }
604 }
605
606 for row in &rows_to_delete {
608 self.state_table.delete(row);
609 }
610 if !rows_to_delete.is_empty() {
611 let to_delete_chunk = StreamChunk::from_rows(
612 &rows_to_delete
613 .iter()
614 .map(|row| (Op::Delete, row))
615 .collect_vec(),
616 &self.schema.data_types(),
617 );
618
619 yield Message::Chunk(to_delete_chunk);
620 }
621
622 assert!(pending_barrier.is_some(), "pending barrier is not set");
624
625 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
626 barrier: pending_barrier.unwrap(),
627 expect_next_state: if merge_complete {
628 Box::new(MaterializeStreamState::CleanUp)
629 } else {
630 Box::new(MaterializeStreamState::MergingData)
631 },
632 };
633 continue 'main_loop;
634 }
635 MaterializeStreamState::CleanUp => {
636 let Some(refresh_args) = self.refresh_args.as_mut() else {
637 panic!(
638 "MaterializeExecutor entered MergingData state without refresh_args configured"
639 );
640 };
641 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
642
643 #[for_await]
644 for msg in input.by_ref() {
645 let msg = msg?;
646 match msg {
647 Message::Watermark(w) => yield Message::Watermark(w),
648 Message::Chunk(chunk) => {
649 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
650 }
651 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
652 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
653 barrier,
654 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
655 };
656 continue 'main_loop;
657 }
658 Message::Barrier(barrier) => {
659 let staging_table_id = refresh_args.staging_table.table_id();
660 let epoch = barrier.epoch;
661 self.local_barrier_manager.report_refresh_finished(
662 epoch,
663 self.actor_context.id,
664 refresh_args.table_id,
665 staging_table_id,
666 );
667 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
668
669 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
670 barrier,
671 expect_next_state: Box::new(
672 MaterializeStreamState::RefreshEnd {
673 on_complete_epoch: epoch,
674 },
675 ),
676 };
677 continue 'main_loop;
678 }
679 }
680 }
681 }
682 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
683 let Some(refresh_args) = self.refresh_args.as_mut() else {
684 panic!(
685 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
686 );
687 };
688 let staging_table_id = refresh_args.staging_table.table_id();
689
690 let staging_store = refresh_args.staging_table.state_store().clone();
692 staging_store
693 .try_wait_epoch(
694 HummockReadEpoch::Committed(on_complete_epoch.prev),
695 TryWaitEpochOptions {
696 table_id: staging_table_id,
697 },
698 )
699 .await?;
700
701 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
702
703 if let Some(ref mut refresh_args) = self.refresh_args {
704 refresh_args.is_refreshing = false;
705 }
706 *inner_state = MaterializeStreamState::NormalIngestion;
707 continue 'main_loop;
708 }
709 MaterializeStreamState::CommitAndYieldBarrier {
710 barrier,
711 mut expect_next_state,
712 } => {
713 if let Some(ref mut refresh_args) = self.refresh_args {
714 match barrier.mutation.as_deref() {
715 Some(Mutation::RefreshStart {
716 table_id: refresh_table_id,
717 associated_source_id: _,
718 }) if *refresh_table_id == refresh_args.table_id => {
719 debug_assert!(
720 !refresh_args.is_refreshing,
721 "cannot start refresh twice"
722 );
723 refresh_args.is_refreshing = true;
724 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
725
726 Self::init_refresh_progress(
728 &self.state_table,
729 &mut refresh_args.progress_table,
730 barrier.epoch.curr,
731 )?;
732 }
733 Some(Mutation::LoadFinish {
734 associated_source_id: load_finish_source_id,
735 }) => {
736 let associated_source_id: SourceId = match refresh_args
738 .table_catalog
739 .optional_associated_source_id
740 {
741 Some(id) => id.into(),
742 None => unreachable!("associated_source_id is not set"),
743 };
744
745 if *load_finish_source_id == associated_source_id {
746 tracing::info!(
747 %load_finish_source_id,
748 "LoadFinish received, starting data replacement"
749 );
750 expect_next_state =
751 Box::new(MaterializeStreamState::<_>::MergingData);
752 }
753 }
754 _ => {}
755 }
756 }
757
758 if !self.may_have_downstream
762 && barrier.has_more_downstream_fragments(self.actor_context.id)
763 {
764 self.may_have_downstream = true;
765 }
766 Self::may_update_depended_subscriptions(
767 &mut self.subscriber_ids,
768 &barrier,
769 mv_table_id,
770 );
771 let op_consistency_level = get_op_consistency_level(
772 self.conflict_behavior,
773 self.may_have_downstream,
774 &self.subscriber_ids,
775 );
776 let post_commit = self
777 .state_table
778 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
779 .await?;
780 if !post_commit.inner().is_consistent_op() {
781 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
782 }
783
784 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
785
786 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
788 {
789 Some((
792 refresh_args.staging_table.commit(barrier.epoch).await?,
793 refresh_args.progress_table.commit(barrier.epoch).await?,
794 ))
795 } else {
796 None
797 };
798
799 let b_epoch = barrier.epoch;
800 yield Message::Barrier(barrier);
801
802 if let Some((_, cache_may_stale)) = post_commit
804 .post_yield_barrier(update_vnode_bitmap.clone())
805 .await?
806 && cache_may_stale
807 && let Some(cache) = &mut self.materialize_cache
808 {
809 cache.clear();
810 }
811
812 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
814 staging_post_commit
815 .post_yield_barrier(update_vnode_bitmap.clone())
816 .await?;
817 progress_post_commit
818 .post_yield_barrier(update_vnode_bitmap)
819 .await?;
820 }
821
822 self.metrics
823 .materialize_current_epoch
824 .set(b_epoch.curr as i64);
825
826 *inner_state = *expect_next_state;
829 }
830 }
831 }
832 }
833
834 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
838 async fn make_mergesort_stream<'a>(
839 main_table: &'a StateTableInner<S, SD>,
840 staging_table: &'a StateTableInner<S, SD>,
841 progress_table: &'a mut RefreshProgressTable<S>,
842 ) {
843 for vnode in main_table.vnodes().clone().iter_vnodes() {
844 let mut processed_rows = 0;
845 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
847 if let Some(current_entry) = progress_table.get_progress(vnode) {
848 if current_entry.is_completed {
850 tracing::debug!(
851 vnode = vnode.to_index(),
852 "Skipping already completed VNode during recovery"
853 );
854 continue;
855 }
856 processed_rows += current_entry.processed_rows;
857 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
858
859 if let Some(current_state) = ¤t_entry.current_pos {
860 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
861 } else {
862 (Bound::Unbounded, Bound::Unbounded)
863 }
864 } else {
865 (Bound::Unbounded, Bound::Unbounded)
866 };
867
868 let iter_main = main_table
869 .iter_keyed_row_with_vnode(
870 vnode,
871 &pk_range,
872 PrefetchOptions::prefetch_for_large_range_scan(),
873 )
874 .await?;
875 let iter_staging = staging_table
876 .iter_keyed_row_with_vnode(
877 vnode,
878 &pk_range,
879 PrefetchOptions::prefetch_for_large_range_scan(),
880 )
881 .await?;
882
883 pin_mut!(iter_main);
884 pin_mut!(iter_staging);
885
886 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
888 let mut staging_item: Option<KeyedRow<Bytes>> =
889 iter_staging.next().await.transpose()?;
890
891 while let Some(main_kv) = main_item {
892 let main_key = main_kv.key();
893
894 let mut should_delete = false;
896 while let Some(staging_kv) = &staging_item {
897 let staging_key = staging_kv.key();
898 match main_key.cmp(staging_key) {
899 std::cmp::Ordering::Greater => {
900 staging_item = iter_staging.next().await.transpose()?;
902 }
903 std::cmp::Ordering::Equal => {
904 break;
906 }
907 std::cmp::Ordering::Less => {
908 should_delete = true;
910 break;
911 }
912 }
913 }
914
915 if staging_item.is_none() {
917 should_delete = true;
918 }
919
920 if should_delete {
921 yield Some((vnode, main_kv.row().clone()));
922 }
923
924 processed_rows += 1;
926 tracing::debug!(
927 "set progress table: vnode = {:?}, processed_rows = {:?}",
928 vnode,
929 processed_rows
930 );
931 progress_table.set_progress(
932 vnode,
933 Some(
934 main_kv
935 .row()
936 .project(main_table.pk_indices())
937 .to_owned_row(),
938 ),
939 false,
940 processed_rows,
941 )?;
942 main_item = iter_main.next().await.transpose()?;
943 }
944
945 if let Some(current_entry) = progress_table.get_progress(vnode) {
947 progress_table.set_progress(
948 vnode,
949 current_entry.current_pos.clone(),
950 true, current_entry.processed_rows,
952 )?;
953
954 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
955 }
956 }
957
958 yield None;
960 }
961
962 fn may_update_depended_subscriptions(
964 depended_subscriptions: &mut HashSet<SubscriberId>,
965 barrier: &Barrier,
966 mv_table_id: TableId,
967 ) {
968 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
969 if !depended_subscriptions.insert(subscriber_id) {
970 warn!(
971 ?depended_subscriptions,
972 %mv_table_id,
973 %subscriber_id,
974 "subscription id already exists"
975 );
976 }
977 }
978
979 if let Some(Mutation::DropSubscriptions {
980 subscriptions_to_drop,
981 }) = barrier.mutation.as_deref()
982 {
983 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
984 if *upstream_mv_table_id == mv_table_id
985 && !depended_subscriptions.remove(subscriber_id)
986 {
987 warn!(
988 ?depended_subscriptions,
989 %mv_table_id,
990 %subscriber_id,
991 "drop non existing subscriber_id id"
992 );
993 }
994 }
995 }
996 }
997
998 fn init_refresh_progress(
1000 state_table: &StateTableInner<S, SD>,
1001 progress_table: &mut RefreshProgressTable<S>,
1002 _epoch: u64,
1003 ) -> StreamExecutorResult<()> {
1004 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1005
1006 for vnode in state_table.vnodes().iter_vnodes() {
1008 progress_table.set_progress(
1009 vnode, None, false, 0, )?;
1013 }
1014
1015 tracing::info!(
1016 vnodes_count = state_table.vnodes().count_ones(),
1017 "Initialized refresh progress tracking for all VNodes"
1018 );
1019
1020 Ok(())
1021 }
1022}
1023
1024impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1025 #[cfg(any(test, feature = "test"))]
1027 pub async fn for_test(
1028 input: Executor,
1029 store: S,
1030 table_id: TableId,
1031 keys: Vec<ColumnOrder>,
1032 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1033 watermark_epoch: AtomicU64Ref,
1034 conflict_behavior: ConflictBehavior,
1035 ) -> Self {
1036 use risingwave_common::util::iter_util::ZipEqFast;
1037
1038 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1039 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1040 let schema = input.schema().clone();
1041 let columns: Vec<ColumnDesc> = column_ids
1042 .into_iter()
1043 .zip_eq_fast(schema.fields.iter())
1044 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1045 .collect_vec();
1046
1047 let row_serde = BasicSerde::new(
1048 Arc::from((0..columns.len()).collect_vec()),
1049 Arc::from(columns.clone().into_boxed_slice()),
1050 );
1051 let state_table = StateTableInner::from_table_catalog(
1052 &crate::common::table::test_utils::gen_pbtable(
1053 table_id,
1054 columns,
1055 arrange_order_types,
1056 arrange_columns.clone(),
1057 0,
1058 ),
1059 store,
1060 None,
1061 )
1062 .await;
1063
1064 let unused = StreamingMetrics::unused();
1065 let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1066 let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1067
1068 Self {
1069 input,
1070 schema,
1071 state_table,
1072 arrange_key_indices: arrange_columns.clone(),
1073 actor_context: ActorContext::for_test(0),
1074 materialize_cache: MaterializeCache::new(
1075 watermark_epoch,
1076 MetricsInfo::for_test(),
1077 row_serde,
1078 vec![],
1079 conflict_behavior,
1080 None,
1081 cache_metrics,
1082 ),
1083 conflict_behavior,
1084 version_column_indices: vec![],
1085 is_dummy_table: false,
1086 may_have_downstream: true,
1087 subscriber_ids: HashSet::new(),
1088 metrics,
1089 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1091 }
1092 }
1093}
1094
1095impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1096 fn execute(self: Box<Self>) -> BoxedMessageStream {
1097 self.execute_inner().boxed()
1098 }
1099}
1100
1101impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103 f.debug_struct("MaterializeExecutor")
1104 .field("arrange_key_indices", &self.arrange_key_indices)
1105 .finish()
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111
1112 use std::iter;
1113 use std::sync::atomic::AtomicU64;
1114
1115 use rand::rngs::SmallRng;
1116 use rand::{Rng, RngCore, SeedableRng};
1117 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1118 use risingwave_common::catalog::Field;
1119 use risingwave_common::util::epoch::test_epoch;
1120 use risingwave_common::util::sort_util::OrderType;
1121 use risingwave_hummock_sdk::HummockReadEpoch;
1122 use risingwave_storage::memory::MemoryStateStore;
1123 use risingwave_storage::table::batch_table::BatchTable;
1124
1125 use super::*;
1126 use crate::executor::test_utils::*;
1127
1128 #[tokio::test]
1129 async fn test_materialize_executor() {
1130 let memory_state_store = MemoryStateStore::new();
1132 let table_id = TableId::new(1);
1133 let schema = Schema::new(vec![
1135 Field::unnamed(DataType::Int32),
1136 Field::unnamed(DataType::Int32),
1137 ]);
1138 let column_ids = vec![0.into(), 1.into()];
1139
1140 let chunk1 = StreamChunk::from_pretty(
1142 " i i
1143 + 1 4
1144 + 2 5
1145 + 3 6",
1146 );
1147 let chunk2 = StreamChunk::from_pretty(
1148 " i i
1149 + 7 8
1150 - 3 6",
1151 );
1152
1153 let source = MockSource::with_messages(vec![
1155 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1156 Message::Chunk(chunk1),
1157 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1158 Message::Chunk(chunk2),
1159 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1160 ])
1161 .into_executor(schema.clone(), StreamKey::new());
1162
1163 let order_types = vec![OrderType::ascending()];
1164 let column_descs = vec![
1165 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1166 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1167 ];
1168
1169 let table = BatchTable::for_test(
1170 memory_state_store.clone(),
1171 table_id,
1172 column_descs,
1173 order_types,
1174 vec![0],
1175 vec![0, 1],
1176 );
1177
1178 let mut materialize_executor = MaterializeExecutor::for_test(
1179 source,
1180 memory_state_store,
1181 table_id,
1182 vec![ColumnOrder::new(0, OrderType::ascending())],
1183 column_ids,
1184 Arc::new(AtomicU64::new(0)),
1185 ConflictBehavior::NoCheck,
1186 )
1187 .await
1188 .boxed()
1189 .execute();
1190 materialize_executor.next().await.transpose().unwrap();
1191
1192 materialize_executor.next().await.transpose().unwrap();
1193
1194 match materialize_executor.next().await.transpose().unwrap() {
1196 Some(Message::Barrier(_)) => {
1197 let row = table
1198 .get_row(
1199 &OwnedRow::new(vec![Some(3_i32.into())]),
1200 HummockReadEpoch::NoWait(u64::MAX),
1201 )
1202 .await
1203 .unwrap();
1204 assert_eq!(
1205 row,
1206 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1207 );
1208 }
1209 _ => unreachable!(),
1210 }
1211 materialize_executor.next().await.transpose().unwrap();
1212 match materialize_executor.next().await.transpose().unwrap() {
1214 Some(Message::Barrier(_)) => {
1215 let row = table
1216 .get_row(
1217 &OwnedRow::new(vec![Some(7_i32.into())]),
1218 HummockReadEpoch::NoWait(u64::MAX),
1219 )
1220 .await
1221 .unwrap();
1222 assert_eq!(
1223 row,
1224 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1225 );
1226 }
1227 _ => unreachable!(),
1228 }
1229 }
1230
1231 #[tokio::test]
1233 async fn test_upsert_stream() {
1234 let memory_state_store = MemoryStateStore::new();
1236 let table_id = TableId::new(1);
1237 let schema = Schema::new(vec![
1239 Field::unnamed(DataType::Int32),
1240 Field::unnamed(DataType::Int32),
1241 ]);
1242 let column_ids = vec![0.into(), 1.into()];
1243
1244 let chunk1 = StreamChunk::from_pretty(
1246 " i i
1247 + 1 1",
1248 );
1249
1250 let chunk2 = StreamChunk::from_pretty(
1251 " i i
1252 + 1 2
1253 - 1 2",
1254 );
1255
1256 let source = MockSource::with_messages(vec![
1258 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1259 Message::Chunk(chunk1),
1260 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1261 Message::Chunk(chunk2),
1262 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1263 ])
1264 .into_executor(schema.clone(), StreamKey::new());
1265
1266 let order_types = vec![OrderType::ascending()];
1267 let column_descs = vec![
1268 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1269 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1270 ];
1271
1272 let table = BatchTable::for_test(
1273 memory_state_store.clone(),
1274 table_id,
1275 column_descs,
1276 order_types,
1277 vec![0],
1278 vec![0, 1],
1279 );
1280
1281 let mut materialize_executor = MaterializeExecutor::for_test(
1282 source,
1283 memory_state_store,
1284 table_id,
1285 vec![ColumnOrder::new(0, OrderType::ascending())],
1286 column_ids,
1287 Arc::new(AtomicU64::new(0)),
1288 ConflictBehavior::Overwrite,
1289 )
1290 .await
1291 .boxed()
1292 .execute();
1293 materialize_executor.next().await.transpose().unwrap();
1294
1295 materialize_executor.next().await.transpose().unwrap();
1296 materialize_executor.next().await.transpose().unwrap();
1297 materialize_executor.next().await.transpose().unwrap();
1298
1299 match materialize_executor.next().await.transpose().unwrap() {
1300 Some(Message::Barrier(_)) => {
1301 let row = table
1302 .get_row(
1303 &OwnedRow::new(vec![Some(1_i32.into())]),
1304 HummockReadEpoch::NoWait(u64::MAX),
1305 )
1306 .await
1307 .unwrap();
1308 assert!(row.is_none());
1309 }
1310 _ => unreachable!(),
1311 }
1312 }
1313
1314 #[tokio::test]
1315 async fn test_check_insert_conflict() {
1316 let memory_state_store = MemoryStateStore::new();
1318 let table_id = TableId::new(1);
1319 let schema = Schema::new(vec![
1321 Field::unnamed(DataType::Int32),
1322 Field::unnamed(DataType::Int32),
1323 ]);
1324 let column_ids = vec![0.into(), 1.into()];
1325
1326 let chunk1 = StreamChunk::from_pretty(
1328 " i i
1329 + 1 3
1330 + 1 4
1331 + 2 5
1332 + 3 6",
1333 );
1334
1335 let chunk2 = StreamChunk::from_pretty(
1336 " i i
1337 + 1 3
1338 + 2 6",
1339 );
1340
1341 let chunk3 = StreamChunk::from_pretty(
1343 " i i
1344 + 1 4",
1345 );
1346
1347 let source = MockSource::with_messages(vec![
1349 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1350 Message::Chunk(chunk1),
1351 Message::Chunk(chunk2),
1352 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1353 Message::Chunk(chunk3),
1354 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1355 ])
1356 .into_executor(schema.clone(), StreamKey::new());
1357
1358 let order_types = vec![OrderType::ascending()];
1359 let column_descs = vec![
1360 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1361 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1362 ];
1363
1364 let table = BatchTable::for_test(
1365 memory_state_store.clone(),
1366 table_id,
1367 column_descs,
1368 order_types,
1369 vec![0],
1370 vec![0, 1],
1371 );
1372
1373 let mut materialize_executor = MaterializeExecutor::for_test(
1374 source,
1375 memory_state_store,
1376 table_id,
1377 vec![ColumnOrder::new(0, OrderType::ascending())],
1378 column_ids,
1379 Arc::new(AtomicU64::new(0)),
1380 ConflictBehavior::Overwrite,
1381 )
1382 .await
1383 .boxed()
1384 .execute();
1385 materialize_executor.next().await.transpose().unwrap();
1386
1387 materialize_executor.next().await.transpose().unwrap();
1388 materialize_executor.next().await.transpose().unwrap();
1389
1390 match materialize_executor.next().await.transpose().unwrap() {
1392 Some(Message::Barrier(_)) => {
1393 let row = table
1394 .get_row(
1395 &OwnedRow::new(vec![Some(3_i32.into())]),
1396 HummockReadEpoch::NoWait(u64::MAX),
1397 )
1398 .await
1399 .unwrap();
1400 assert_eq!(
1401 row,
1402 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1403 );
1404
1405 let row = table
1406 .get_row(
1407 &OwnedRow::new(vec![Some(1_i32.into())]),
1408 HummockReadEpoch::NoWait(u64::MAX),
1409 )
1410 .await
1411 .unwrap();
1412 assert_eq!(
1413 row,
1414 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1415 );
1416
1417 let row = table
1418 .get_row(
1419 &OwnedRow::new(vec![Some(2_i32.into())]),
1420 HummockReadEpoch::NoWait(u64::MAX),
1421 )
1422 .await
1423 .unwrap();
1424 assert_eq!(
1425 row,
1426 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1427 );
1428 }
1429 _ => unreachable!(),
1430 }
1431 }
1432
1433 #[tokio::test]
1434 async fn test_delete_and_update_conflict() {
1435 let memory_state_store = MemoryStateStore::new();
1437 let table_id = TableId::new(1);
1438 let schema = Schema::new(vec![
1440 Field::unnamed(DataType::Int32),
1441 Field::unnamed(DataType::Int32),
1442 ]);
1443 let column_ids = vec![0.into(), 1.into()];
1444
1445 let chunk1 = StreamChunk::from_pretty(
1447 " i i
1448 + 1 4
1449 + 2 5
1450 + 3 6
1451 U- 8 1
1452 U+ 8 2
1453 + 8 3",
1454 );
1455
1456 let chunk2 = StreamChunk::from_pretty(
1458 " i i
1459 + 7 8
1460 - 3 4
1461 - 5 0",
1462 );
1463
1464 let chunk3 = StreamChunk::from_pretty(
1466 " i i
1467 + 1 5
1468 U- 2 4
1469 U+ 2 8
1470 U- 9 0
1471 U+ 9 1",
1472 );
1473
1474 let source = MockSource::with_messages(vec![
1476 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1477 Message::Chunk(chunk1),
1478 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1479 Message::Chunk(chunk2),
1480 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1481 Message::Chunk(chunk3),
1482 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1483 ])
1484 .into_executor(schema.clone(), StreamKey::new());
1485
1486 let order_types = vec![OrderType::ascending()];
1487 let column_descs = vec![
1488 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1489 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1490 ];
1491
1492 let table = BatchTable::for_test(
1493 memory_state_store.clone(),
1494 table_id,
1495 column_descs,
1496 order_types,
1497 vec![0],
1498 vec![0, 1],
1499 );
1500
1501 let mut materialize_executor = MaterializeExecutor::for_test(
1502 source,
1503 memory_state_store,
1504 table_id,
1505 vec![ColumnOrder::new(0, OrderType::ascending())],
1506 column_ids,
1507 Arc::new(AtomicU64::new(0)),
1508 ConflictBehavior::Overwrite,
1509 )
1510 .await
1511 .boxed()
1512 .execute();
1513 materialize_executor.next().await.transpose().unwrap();
1514
1515 materialize_executor.next().await.transpose().unwrap();
1516
1517 match materialize_executor.next().await.transpose().unwrap() {
1519 Some(Message::Barrier(_)) => {
1520 let row = table
1522 .get_row(
1523 &OwnedRow::new(vec![Some(8_i32.into())]),
1524 HummockReadEpoch::NoWait(u64::MAX),
1525 )
1526 .await
1527 .unwrap();
1528 assert_eq!(
1529 row,
1530 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1531 );
1532 }
1533 _ => unreachable!(),
1534 }
1535 materialize_executor.next().await.transpose().unwrap();
1536
1537 match materialize_executor.next().await.transpose().unwrap() {
1538 Some(Message::Barrier(_)) => {
1539 let row = table
1540 .get_row(
1541 &OwnedRow::new(vec![Some(7_i32.into())]),
1542 HummockReadEpoch::NoWait(u64::MAX),
1543 )
1544 .await
1545 .unwrap();
1546 assert_eq!(
1547 row,
1548 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1549 );
1550
1551 let row = table
1553 .get_row(
1554 &OwnedRow::new(vec![Some(3_i32.into())]),
1555 HummockReadEpoch::NoWait(u64::MAX),
1556 )
1557 .await
1558 .unwrap();
1559 assert_eq!(row, None);
1560
1561 let row = table
1563 .get_row(
1564 &OwnedRow::new(vec![Some(5_i32.into())]),
1565 HummockReadEpoch::NoWait(u64::MAX),
1566 )
1567 .await
1568 .unwrap();
1569 assert_eq!(row, None);
1570 }
1571 _ => unreachable!(),
1572 }
1573
1574 materialize_executor.next().await.transpose().unwrap();
1575 match materialize_executor.next().await.transpose().unwrap() {
1577 Some(Message::Barrier(_)) => {
1578 let row = table
1579 .get_row(
1580 &OwnedRow::new(vec![Some(1_i32.into())]),
1581 HummockReadEpoch::NoWait(u64::MAX),
1582 )
1583 .await
1584 .unwrap();
1585 assert_eq!(
1586 row,
1587 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1588 );
1589
1590 let row = table
1592 .get_row(
1593 &OwnedRow::new(vec![Some(2_i32.into())]),
1594 HummockReadEpoch::NoWait(u64::MAX),
1595 )
1596 .await
1597 .unwrap();
1598 assert_eq!(
1599 row,
1600 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1601 );
1602
1603 let row = table
1605 .get_row(
1606 &OwnedRow::new(vec![Some(9_i32.into())]),
1607 HummockReadEpoch::NoWait(u64::MAX),
1608 )
1609 .await
1610 .unwrap();
1611 assert_eq!(
1612 row,
1613 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1614 );
1615 }
1616 _ => unreachable!(),
1617 }
1618 }
1619
1620 #[tokio::test]
1621 async fn test_ignore_insert_conflict() {
1622 let memory_state_store = MemoryStateStore::new();
1624 let table_id = TableId::new(1);
1625 let schema = Schema::new(vec![
1627 Field::unnamed(DataType::Int32),
1628 Field::unnamed(DataType::Int32),
1629 ]);
1630 let column_ids = vec![0.into(), 1.into()];
1631
1632 let chunk1 = StreamChunk::from_pretty(
1634 " i i
1635 + 1 3
1636 + 1 4
1637 + 2 5
1638 + 3 6",
1639 );
1640
1641 let chunk2 = StreamChunk::from_pretty(
1642 " i i
1643 + 1 5
1644 + 2 6",
1645 );
1646
1647 let chunk3 = StreamChunk::from_pretty(
1649 " i i
1650 + 1 6",
1651 );
1652
1653 let source = MockSource::with_messages(vec![
1655 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1656 Message::Chunk(chunk1),
1657 Message::Chunk(chunk2),
1658 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1659 Message::Chunk(chunk3),
1660 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1661 ])
1662 .into_executor(schema.clone(), StreamKey::new());
1663
1664 let order_types = vec![OrderType::ascending()];
1665 let column_descs = vec![
1666 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1667 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1668 ];
1669
1670 let table = BatchTable::for_test(
1671 memory_state_store.clone(),
1672 table_id,
1673 column_descs,
1674 order_types,
1675 vec![0],
1676 vec![0, 1],
1677 );
1678
1679 let mut materialize_executor = MaterializeExecutor::for_test(
1680 source,
1681 memory_state_store,
1682 table_id,
1683 vec![ColumnOrder::new(0, OrderType::ascending())],
1684 column_ids,
1685 Arc::new(AtomicU64::new(0)),
1686 ConflictBehavior::IgnoreConflict,
1687 )
1688 .await
1689 .boxed()
1690 .execute();
1691 materialize_executor.next().await.transpose().unwrap();
1692
1693 materialize_executor.next().await.transpose().unwrap();
1694 materialize_executor.next().await.transpose().unwrap();
1695
1696 match materialize_executor.next().await.transpose().unwrap() {
1698 Some(Message::Barrier(_)) => {
1699 let row = table
1700 .get_row(
1701 &OwnedRow::new(vec![Some(3_i32.into())]),
1702 HummockReadEpoch::NoWait(u64::MAX),
1703 )
1704 .await
1705 .unwrap();
1706 assert_eq!(
1707 row,
1708 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1709 );
1710
1711 let row = table
1712 .get_row(
1713 &OwnedRow::new(vec![Some(1_i32.into())]),
1714 HummockReadEpoch::NoWait(u64::MAX),
1715 )
1716 .await
1717 .unwrap();
1718 assert_eq!(
1719 row,
1720 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1721 );
1722
1723 let row = table
1724 .get_row(
1725 &OwnedRow::new(vec![Some(2_i32.into())]),
1726 HummockReadEpoch::NoWait(u64::MAX),
1727 )
1728 .await
1729 .unwrap();
1730 assert_eq!(
1731 row,
1732 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1733 );
1734 }
1735 _ => unreachable!(),
1736 }
1737 }
1738
1739 #[tokio::test]
1740 async fn test_ignore_delete_then_insert() {
1741 let memory_state_store = MemoryStateStore::new();
1743 let table_id = TableId::new(1);
1744 let schema = Schema::new(vec![
1746 Field::unnamed(DataType::Int32),
1747 Field::unnamed(DataType::Int32),
1748 ]);
1749 let column_ids = vec![0.into(), 1.into()];
1750
1751 let chunk1 = StreamChunk::from_pretty(
1753 " i i
1754 + 1 3
1755 - 1 3
1756 + 1 6",
1757 );
1758
1759 let source = MockSource::with_messages(vec![
1761 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1762 Message::Chunk(chunk1),
1763 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1764 ])
1765 .into_executor(schema.clone(), StreamKey::new());
1766
1767 let order_types = vec![OrderType::ascending()];
1768 let column_descs = vec![
1769 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1770 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1771 ];
1772
1773 let table = BatchTable::for_test(
1774 memory_state_store.clone(),
1775 table_id,
1776 column_descs,
1777 order_types,
1778 vec![0],
1779 vec![0, 1],
1780 );
1781
1782 let mut materialize_executor = MaterializeExecutor::for_test(
1783 source,
1784 memory_state_store,
1785 table_id,
1786 vec![ColumnOrder::new(0, OrderType::ascending())],
1787 column_ids,
1788 Arc::new(AtomicU64::new(0)),
1789 ConflictBehavior::IgnoreConflict,
1790 )
1791 .await
1792 .boxed()
1793 .execute();
1794 let _msg1 = materialize_executor
1795 .next()
1796 .await
1797 .transpose()
1798 .unwrap()
1799 .unwrap()
1800 .as_barrier()
1801 .unwrap();
1802 let _msg2 = materialize_executor
1803 .next()
1804 .await
1805 .transpose()
1806 .unwrap()
1807 .unwrap()
1808 .as_chunk()
1809 .unwrap();
1810 let _msg3 = materialize_executor
1811 .next()
1812 .await
1813 .transpose()
1814 .unwrap()
1815 .unwrap()
1816 .as_barrier()
1817 .unwrap();
1818
1819 let row = table
1820 .get_row(
1821 &OwnedRow::new(vec![Some(1_i32.into())]),
1822 HummockReadEpoch::NoWait(u64::MAX),
1823 )
1824 .await
1825 .unwrap();
1826 assert_eq!(
1827 row,
1828 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1829 );
1830 }
1831
1832 #[tokio::test]
1833 async fn test_ignore_delete_and_update_conflict() {
1834 let memory_state_store = MemoryStateStore::new();
1836 let table_id = TableId::new(1);
1837 let schema = Schema::new(vec![
1839 Field::unnamed(DataType::Int32),
1840 Field::unnamed(DataType::Int32),
1841 ]);
1842 let column_ids = vec![0.into(), 1.into()];
1843
1844 let chunk1 = StreamChunk::from_pretty(
1846 " i i
1847 + 1 4
1848 + 2 5
1849 + 3 6
1850 U- 8 1
1851 U+ 8 2
1852 + 8 3",
1853 );
1854
1855 let chunk2 = StreamChunk::from_pretty(
1857 " i i
1858 + 7 8
1859 - 3 4
1860 - 5 0",
1861 );
1862
1863 let chunk3 = StreamChunk::from_pretty(
1865 " i i
1866 + 1 5
1867 U- 2 4
1868 U+ 2 8
1869 U- 9 0
1870 U+ 9 1",
1871 );
1872
1873 let source = MockSource::with_messages(vec![
1875 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1876 Message::Chunk(chunk1),
1877 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1878 Message::Chunk(chunk2),
1879 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1880 Message::Chunk(chunk3),
1881 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1882 ])
1883 .into_executor(schema.clone(), StreamKey::new());
1884
1885 let order_types = vec![OrderType::ascending()];
1886 let column_descs = vec![
1887 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1888 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1889 ];
1890
1891 let table = BatchTable::for_test(
1892 memory_state_store.clone(),
1893 table_id,
1894 column_descs,
1895 order_types,
1896 vec![0],
1897 vec![0, 1],
1898 );
1899
1900 let mut materialize_executor = MaterializeExecutor::for_test(
1901 source,
1902 memory_state_store,
1903 table_id,
1904 vec![ColumnOrder::new(0, OrderType::ascending())],
1905 column_ids,
1906 Arc::new(AtomicU64::new(0)),
1907 ConflictBehavior::IgnoreConflict,
1908 )
1909 .await
1910 .boxed()
1911 .execute();
1912 materialize_executor.next().await.transpose().unwrap();
1913
1914 materialize_executor.next().await.transpose().unwrap();
1915
1916 match materialize_executor.next().await.transpose().unwrap() {
1918 Some(Message::Barrier(_)) => {
1919 let row = table
1921 .get_row(
1922 &OwnedRow::new(vec![Some(8_i32.into())]),
1923 HummockReadEpoch::NoWait(u64::MAX),
1924 )
1925 .await
1926 .unwrap();
1927 assert_eq!(
1928 row,
1929 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1930 );
1931 }
1932 _ => unreachable!(),
1933 }
1934 materialize_executor.next().await.transpose().unwrap();
1935
1936 match materialize_executor.next().await.transpose().unwrap() {
1937 Some(Message::Barrier(_)) => {
1938 let row = table
1939 .get_row(
1940 &OwnedRow::new(vec![Some(7_i32.into())]),
1941 HummockReadEpoch::NoWait(u64::MAX),
1942 )
1943 .await
1944 .unwrap();
1945 assert_eq!(
1946 row,
1947 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1948 );
1949
1950 let row = table
1952 .get_row(
1953 &OwnedRow::new(vec![Some(3_i32.into())]),
1954 HummockReadEpoch::NoWait(u64::MAX),
1955 )
1956 .await
1957 .unwrap();
1958 assert_eq!(row, None);
1959
1960 let row = table
1962 .get_row(
1963 &OwnedRow::new(vec![Some(5_i32.into())]),
1964 HummockReadEpoch::NoWait(u64::MAX),
1965 )
1966 .await
1967 .unwrap();
1968 assert_eq!(row, None);
1969 }
1970 _ => unreachable!(),
1971 }
1972
1973 materialize_executor.next().await.transpose().unwrap();
1974 match materialize_executor.next().await.transpose().unwrap() {
1977 Some(Message::Barrier(_)) => {
1978 let row = table
1979 .get_row(
1980 &OwnedRow::new(vec![Some(1_i32.into())]),
1981 HummockReadEpoch::NoWait(u64::MAX),
1982 )
1983 .await
1984 .unwrap();
1985 assert_eq!(
1986 row,
1987 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1988 );
1989
1990 let row = table
1992 .get_row(
1993 &OwnedRow::new(vec![Some(2_i32.into())]),
1994 HummockReadEpoch::NoWait(u64::MAX),
1995 )
1996 .await
1997 .unwrap();
1998 assert_eq!(
1999 row,
2000 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2001 );
2002
2003 let row = table
2005 .get_row(
2006 &OwnedRow::new(vec![Some(9_i32.into())]),
2007 HummockReadEpoch::NoWait(u64::MAX),
2008 )
2009 .await
2010 .unwrap();
2011 assert_eq!(
2012 row,
2013 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2014 );
2015 }
2016 _ => unreachable!(),
2017 }
2018 }
2019
2020 #[tokio::test]
2021 async fn test_do_update_if_not_null_conflict() {
2022 let memory_state_store = MemoryStateStore::new();
2024 let table_id = TableId::new(1);
2025 let schema = Schema::new(vec![
2027 Field::unnamed(DataType::Int32),
2028 Field::unnamed(DataType::Int32),
2029 ]);
2030 let column_ids = vec![0.into(), 1.into()];
2031
2032 let chunk1 = StreamChunk::from_pretty(
2034 " i i
2035 + 1 4
2036 + 2 .
2037 + 3 6
2038 U- 8 .
2039 U+ 8 2
2040 + 8 .",
2041 );
2042
2043 let chunk2 = StreamChunk::from_pretty(
2045 " i i
2046 + 7 8
2047 - 3 4
2048 - 5 0",
2049 );
2050
2051 let chunk3 = StreamChunk::from_pretty(
2053 " i i
2054 + 1 5
2055 + 7 .
2056 U- 2 4
2057 U+ 2 .
2058 U- 9 0
2059 U+ 9 1",
2060 );
2061
2062 let source = MockSource::with_messages(vec![
2064 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2065 Message::Chunk(chunk1),
2066 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2067 Message::Chunk(chunk2),
2068 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2069 Message::Chunk(chunk3),
2070 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2071 ])
2072 .into_executor(schema.clone(), StreamKey::new());
2073
2074 let order_types = vec![OrderType::ascending()];
2075 let column_descs = vec![
2076 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2077 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2078 ];
2079
2080 let table = BatchTable::for_test(
2081 memory_state_store.clone(),
2082 table_id,
2083 column_descs,
2084 order_types,
2085 vec![0],
2086 vec![0, 1],
2087 );
2088
2089 let mut materialize_executor = MaterializeExecutor::for_test(
2090 source,
2091 memory_state_store,
2092 table_id,
2093 vec![ColumnOrder::new(0, OrderType::ascending())],
2094 column_ids,
2095 Arc::new(AtomicU64::new(0)),
2096 ConflictBehavior::DoUpdateIfNotNull,
2097 )
2098 .await
2099 .boxed()
2100 .execute();
2101 materialize_executor.next().await.transpose().unwrap();
2102
2103 materialize_executor.next().await.transpose().unwrap();
2104
2105 match materialize_executor.next().await.transpose().unwrap() {
2107 Some(Message::Barrier(_)) => {
2108 let row = table
2109 .get_row(
2110 &OwnedRow::new(vec![Some(8_i32.into())]),
2111 HummockReadEpoch::NoWait(u64::MAX),
2112 )
2113 .await
2114 .unwrap();
2115 assert_eq!(
2116 row,
2117 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2118 );
2119
2120 let row = table
2121 .get_row(
2122 &OwnedRow::new(vec![Some(2_i32.into())]),
2123 HummockReadEpoch::NoWait(u64::MAX),
2124 )
2125 .await
2126 .unwrap();
2127 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2128 }
2129 _ => unreachable!(),
2130 }
2131 materialize_executor.next().await.transpose().unwrap();
2132
2133 match materialize_executor.next().await.transpose().unwrap() {
2134 Some(Message::Barrier(_)) => {
2135 let row = table
2136 .get_row(
2137 &OwnedRow::new(vec![Some(7_i32.into())]),
2138 HummockReadEpoch::NoWait(u64::MAX),
2139 )
2140 .await
2141 .unwrap();
2142 assert_eq!(
2143 row,
2144 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2145 );
2146
2147 let row = table
2149 .get_row(
2150 &OwnedRow::new(vec![Some(3_i32.into())]),
2151 HummockReadEpoch::NoWait(u64::MAX),
2152 )
2153 .await
2154 .unwrap();
2155 assert_eq!(row, None);
2156
2157 let row = table
2159 .get_row(
2160 &OwnedRow::new(vec![Some(5_i32.into())]),
2161 HummockReadEpoch::NoWait(u64::MAX),
2162 )
2163 .await
2164 .unwrap();
2165 assert_eq!(row, None);
2166 }
2167 _ => unreachable!(),
2168 }
2169
2170 materialize_executor.next().await.transpose().unwrap();
2171 match materialize_executor.next().await.transpose().unwrap() {
2174 Some(Message::Barrier(_)) => {
2175 let row = table
2176 .get_row(
2177 &OwnedRow::new(vec![Some(7_i32.into())]),
2178 HummockReadEpoch::NoWait(u64::MAX),
2179 )
2180 .await
2181 .unwrap();
2182 assert_eq!(
2183 row,
2184 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2185 );
2186
2187 let row = table
2189 .get_row(
2190 &OwnedRow::new(vec![Some(2_i32.into())]),
2191 HummockReadEpoch::NoWait(u64::MAX),
2192 )
2193 .await
2194 .unwrap();
2195 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2196
2197 let row = table
2199 .get_row(
2200 &OwnedRow::new(vec![Some(9_i32.into())]),
2201 HummockReadEpoch::NoWait(u64::MAX),
2202 )
2203 .await
2204 .unwrap();
2205 assert_eq!(
2206 row,
2207 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2208 );
2209 }
2210 _ => unreachable!(),
2211 }
2212 }
2213
2214 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2215 const KN: u32 = 4;
2216 const SEED: u64 = 998244353;
2217 let mut ret = vec![];
2218 let mut builder =
2219 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2220 let mut rng = SmallRng::seed_from_u64(SEED);
2221
2222 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2223 let len = c.data_chunk().capacity();
2224 let mut c = StreamChunkMut::from(c);
2225 for i in 0..len {
2226 c.set_vis(i, rng.random_bool(0.5));
2227 }
2228 c.into()
2229 };
2230 for _ in 0..row_number {
2231 let k = (rng.next_u32() % KN) as i32;
2232 let v = rng.next_u32() as i32;
2233 let op = if rng.random_bool(0.5) {
2234 Op::Insert
2235 } else {
2236 Op::Delete
2237 };
2238 if let Some(c) =
2239 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2240 {
2241 ret.push(random_vis(c, &mut rng));
2242 }
2243 }
2244 if let Some(c) = builder.take() {
2245 ret.push(random_vis(c, &mut rng));
2246 }
2247 ret
2248 }
2249
2250 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2251 const N: usize = 100000;
2252
2253 let memory_state_store = MemoryStateStore::new();
2255 let table_id = TableId::new(1);
2256 let schema = Schema::new(vec![
2258 Field::unnamed(DataType::Int32),
2259 Field::unnamed(DataType::Int32),
2260 ]);
2261 let column_ids = vec![0.into(), 1.into()];
2262
2263 let chunks = gen_fuzz_data(N, 128);
2264 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2265 .chain(chunks.into_iter().map(Message::Chunk))
2266 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2267 test_epoch(2),
2268 ))))
2269 .collect();
2270 let source =
2272 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2273
2274 let mut materialize_executor = MaterializeExecutor::for_test(
2275 source,
2276 memory_state_store.clone(),
2277 table_id,
2278 vec![ColumnOrder::new(0, OrderType::ascending())],
2279 column_ids,
2280 Arc::new(AtomicU64::new(0)),
2281 conflict_behavior,
2282 )
2283 .await
2284 .boxed()
2285 .execute();
2286 materialize_executor.expect_barrier().await;
2287
2288 let order_types = vec![OrderType::ascending()];
2289 let column_descs = vec![
2290 ColumnDesc::unnamed(0.into(), DataType::Int32),
2291 ColumnDesc::unnamed(1.into(), DataType::Int32),
2292 ];
2293 let pk_indices = vec![0];
2294
2295 let mut table = StateTable::from_table_catalog(
2296 &crate::common::table::test_utils::gen_pbtable(
2297 TableId::from(1002),
2298 column_descs.clone(),
2299 order_types,
2300 pk_indices,
2301 0,
2302 ),
2303 memory_state_store.clone(),
2304 None,
2305 )
2306 .await;
2307
2308 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2309 table.write_chunk(c);
2311 }
2312 }
2313
2314 #[tokio::test]
2315 async fn fuzz_test_stream_consistent_upsert() {
2316 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2317 }
2318
2319 #[tokio::test]
2320 async fn fuzz_test_stream_consistent_ignore() {
2321 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2322 }
2323}