1use std::assert_matches::assert_matches;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::ops::{Bound, Deref, Index};
19
20use bytes::Bytes;
21use futures::future::Either;
22use futures::stream::{self, select_with_strategy};
23use futures_async_stream::try_stream;
24use itertools::Itertools;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{
28 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
29};
30use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
31use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
32use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
33use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
35use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::catalog::Table;
38use risingwave_pb::catalog::table::Engine;
39use risingwave_pb::id::SourceId;
40use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
41use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
42use risingwave_storage::table::KeyedRow;
43
44use crate::cache::ManagedLruCache;
45use crate::common::change_buffer::output_kind as cb_kind;
46use crate::common::metrics::MetricsInfo;
47use crate::common::table::state_table::{
48 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
49};
50use crate::executor::error::ErrorKind;
51use crate::executor::monitor::MaterializeMetrics;
52use crate::executor::mview::RefreshProgressTable;
53use crate::executor::prelude::*;
54use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
55use crate::task::LocalBarrierManager;
56
57#[derive(Debug, Clone)]
58pub enum MaterializeStreamState<M> {
59 NormalIngestion,
60 MergingData,
61 CleanUp,
62 CommitAndYieldBarrier {
63 barrier: BarrierInner<M>,
64 expect_next_state: Box<MaterializeStreamState<M>>,
65 },
66 RefreshEnd {
67 on_complete_epoch: EpochPair,
68 },
69}
70
71pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
73 input: Executor,
74
75 schema: Schema,
76
77 state_table: StateTableInner<S, SD>,
78
79 arrange_key_indices: Vec<usize>,
81
82 actor_context: ActorContextRef,
83
84 materialize_cache: MaterializeCache<SD>,
85
86 conflict_behavior: ConflictBehavior,
87
88 version_column_indices: Vec<u32>,
89
90 may_have_downstream: bool,
91
92 subscriber_ids: HashSet<u32>,
93
94 metrics: MaterializeMetrics,
95
96 is_dummy_table: bool,
99
100 toastable_column_indices: Option<Vec<usize>>,
102
103 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
105
106 local_barrier_manager: LocalBarrierManager,
108}
109
110pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
112 pub table_catalog: Table,
114
115 pub staging_table_catalog: Table,
117
118 pub is_refreshing: bool,
120
121 pub staging_table: StateTableInner<S, SD>,
128
129 pub progress_table: RefreshProgressTable<S>,
131
132 pub table_id: TableId,
134}
135
136impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
137 pub async fn new(
139 store: S,
140 table_catalog: &Table,
141 staging_table_catalog: &Table,
142 progress_state_table: &Table,
143 vnodes: Option<Arc<Bitmap>>,
144 ) -> Self {
145 let table_id = table_catalog.id;
146
147 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
149 staging_table_catalog,
150 store.clone(),
151 vnodes.clone(),
152 )
153 .await;
154
155 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
156 progress_state_table,
157 store,
158 vnodes,
159 )
160 .await;
161
162 let pk_len = table_catalog.pk.len();
164 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
165
166 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
167
168 Self {
169 table_catalog: table_catalog.clone(),
170 staging_table_catalog: staging_table_catalog.clone(),
171 is_refreshing: false,
172 staging_table,
173 progress_table,
174 table_id,
175 }
176 }
177}
178
179fn get_op_consistency_level(
180 conflict_behavior: ConflictBehavior,
181 may_have_downstream: bool,
182 subscriber_ids: &HashSet<u32>,
183) -> StateTableOpConsistencyLevel {
184 if !subscriber_ids.is_empty() {
185 StateTableOpConsistencyLevel::LogStoreEnabled
186 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
187 StateTableOpConsistencyLevel::Inconsistent
190 } else {
191 StateTableOpConsistencyLevel::ConsistentOldValue
192 }
193}
194
195impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
196 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 input: Executor,
202 schema: Schema,
203 store: S,
204 arrange_key: Vec<ColumnOrder>,
205 actor_context: ActorContextRef,
206 vnodes: Option<Arc<Bitmap>>,
207 table_catalog: &Table,
208 watermark_epoch: AtomicU64Ref,
209 conflict_behavior: ConflictBehavior,
210 version_column_indices: Vec<u32>,
211 metrics: Arc<StreamingMetrics>,
212 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
213 local_barrier_manager: LocalBarrierManager,
214 ) -> Self {
215 let table_columns: Vec<ColumnDesc> = table_catalog
216 .columns
217 .iter()
218 .map(|col| col.column_desc.as_ref().unwrap().into())
219 .collect();
220
221 let toastable_column_indices = if table_catalog.cdc_table_type()
224 == risingwave_pb::catalog::table::CdcTableType::Postgres
225 {
226 let toastable_indices: Vec<usize> = table_columns
227 .iter()
228 .enumerate()
229 .filter_map(|(index, column)| match &column.data_type {
230 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
239 Some(index)
240 }
241 _ => None,
242 })
243 .collect();
244
245 if toastable_indices.is_empty() {
246 None
247 } else {
248 Some(toastable_indices)
249 }
250 } else {
251 None
252 };
253
254 let row_serde: BasicSerde = BasicSerde::new(
255 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
256 Arc::from(table_columns.into_boxed_slice()),
257 );
258
259 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
260 let may_have_downstream = actor_context.initial_dispatch_num != 0;
261 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
262 let op_consistency_level =
263 get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
264 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
266 .with_op_consistency_level(op_consistency_level)
267 .enable_preload_all_rows_by_config(&actor_context.config)
268 .build()
269 .await;
270
271 let mv_metrics = metrics.new_materialize_metrics(
272 table_catalog.id,
273 actor_context.id,
274 actor_context.fragment_id,
275 );
276
277 let metrics_info =
278 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
279
280 let is_dummy_table =
281 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
282
283 Self {
284 input,
285 schema,
286 state_table,
287 arrange_key_indices,
288 actor_context,
289 materialize_cache: MaterializeCache::new(
290 watermark_epoch,
291 metrics_info,
292 row_serde,
293 version_column_indices.clone(),
294 ),
295 conflict_behavior,
296 version_column_indices,
297 is_dummy_table,
298 may_have_downstream,
299 subscriber_ids,
300 metrics: mv_metrics,
301 toastable_column_indices,
302 refresh_args,
303 local_barrier_manager,
304 }
305 }
306
307 #[try_stream(ok = Message, error = StreamExecutorError)]
308 async fn execute_inner(mut self) {
309 let mv_table_id = self.state_table.table_id();
310 let data_types = self.schema.data_types();
311 let mut input = self.input.execute();
312
313 let barrier = expect_first_barrier(&mut input).await?;
314 let first_epoch = barrier.epoch;
315 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
318 self.state_table.init_epoch(first_epoch).await?;
319
320 let mut inner_state =
322 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
323 if let Some(ref mut refresh_args) = self.refresh_args {
325 refresh_args.staging_table.init_epoch(first_epoch).await?;
326
327 refresh_args.progress_table.recover(first_epoch).await?;
329
330 let progress_stats = refresh_args.progress_table.get_progress_stats();
332 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
333 refresh_args.is_refreshing = true;
334 tracing::info!(
335 total_vnodes = progress_stats.total_vnodes,
336 completed_vnodes = progress_stats.completed_vnodes,
337 "Recovered refresh in progress, resuming refresh operation"
338 );
339
340 let incomplete_vnodes: Vec<_> = refresh_args
344 .progress_table
345 .get_all_progress()
346 .iter()
347 .filter(|(_, entry)| !entry.is_completed)
348 .map(|(&vnode, _)| vnode)
349 .collect();
350
351 if !incomplete_vnodes.is_empty() {
352 tracing::info!(
354 incomplete_vnodes = incomplete_vnodes.len(),
355 "Recovery detected incomplete VNodes, resuming refresh operation"
356 );
357 } else {
360 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
362 }
363 }
364 }
365
366 if let Some(ref refresh_args) = self.refresh_args
368 && refresh_args.is_refreshing
369 {
370 let incomplete_vnodes: Vec<_> = refresh_args
372 .progress_table
373 .get_all_progress()
374 .iter()
375 .filter(|(_, entry)| !entry.is_completed)
376 .map(|(&vnode, _)| vnode)
377 .collect();
378 if !incomplete_vnodes.is_empty() {
379 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
381 tracing::info!(
382 incomplete_vnodes = incomplete_vnodes.len(),
383 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
384 );
385 }
386 }
387
388 'main_loop: loop {
390 match *inner_state {
391 MaterializeStreamState::NormalIngestion => {
392 #[for_await]
393 '_normal_ingest: for msg in input.by_ref() {
394 let msg = msg?;
395 self.materialize_cache.evict();
396
397 match msg {
398 Message::Watermark(w) => {
399 yield Message::Watermark(w);
400 }
401 Message::Chunk(chunk) if self.is_dummy_table => {
402 self.metrics
403 .materialize_input_row_count
404 .inc_by(chunk.cardinality() as u64);
405 yield Message::Chunk(chunk);
406 }
407 Message::Chunk(chunk) => {
408 self.metrics
409 .materialize_input_row_count
410 .inc_by(chunk.cardinality() as u64);
411
412 let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
417 self.conflict_behavior
418 && !self.state_table.is_consistent_op()
419 && self.version_column_indices.is_empty()
420 {
421 ConflictBehavior::NoCheck
422 } else {
423 self.conflict_behavior
424 };
425
426 match optimized_conflict_behavior {
427 checked_conflict_behaviors!() => {
428 if chunk.cardinality() == 0 {
429 continue;
431 }
432 let (data_chunk, ops) = chunk.clone().into_parts();
433
434 if self.state_table.value_indices().is_some() {
435 panic!(
438 "materialize executor with data check can not handle only materialize partial columns"
439 )
440 };
441 let values = data_chunk.serialize();
442
443 let key_chunk =
444 data_chunk.project(self.state_table.pk_indices());
445
446 if let Some(ref mut refresh_args) = self.refresh_args
449 && refresh_args.is_refreshing
450 {
451 let key_chunk = chunk
452 .clone()
453 .project(self.state_table.pk_indices());
454 tracing::trace!(
455 staging_chunk = %key_chunk.to_pretty(),
456 input_chunk = %chunk.to_pretty(),
457 "writing to staging table"
458 );
459 if cfg!(debug_assertions) {
460 assert!(
462 key_chunk
463 .ops()
464 .iter()
465 .all(|op| op == &Op::Insert)
466 );
467 }
468 refresh_args
469 .staging_table
470 .write_chunk(key_chunk.clone());
471 refresh_args.staging_table.try_flush().await?;
472 }
473
474 let pks = {
475 let mut pks = vec![vec![]; data_chunk.capacity()];
476 key_chunk
477 .rows_with_holes()
478 .zip_eq_fast(pks.iter_mut())
479 .for_each(|(r, vnode_and_pk)| {
480 if let Some(r) = r {
481 self.state_table
482 .pk_serde()
483 .serialize(r, vnode_and_pk);
484 }
485 });
486 pks
487 };
488 let (_, vis) = key_chunk.into_parts();
489 let row_ops = ops
490 .iter()
491 .zip_eq_debug(pks.into_iter())
492 .zip_eq_debug(values.into_iter())
493 .zip_eq_debug(vis.iter())
494 .filter_map(|(((op, k), v), vis)| {
495 vis.then_some((*op, k, v))
496 })
497 .collect_vec();
498
499 let change_buffer = self
500 .materialize_cache
501 .handle(
502 row_ops,
503 &self.state_table,
504 self.conflict_behavior,
505 &self.metrics,
506 self.toastable_column_indices.as_deref(),
507 )
508 .await?;
509
510 match change_buffer
511 .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
512 {
513 Some(output_chunk) => {
514 self.state_table.write_chunk(output_chunk.clone());
515 self.state_table.try_flush().await?;
516 yield Message::Chunk(output_chunk);
517 }
518 None => continue,
519 }
520 }
521 ConflictBehavior::NoCheck => {
522 self.state_table.write_chunk(chunk.clone());
523 self.state_table.try_flush().await?;
524
525 if let Some(ref mut refresh_args) = self.refresh_args
527 && refresh_args.is_refreshing
528 {
529 let key_chunk = chunk
530 .clone()
531 .project(self.state_table.pk_indices());
532 tracing::trace!(
533 staging_chunk = %key_chunk.to_pretty(),
534 input_chunk = %chunk.to_pretty(),
535 "writing to staging table"
536 );
537 if cfg!(debug_assertions) {
538 assert!(
540 key_chunk
541 .ops()
542 .iter()
543 .all(|op| op == &Op::Insert)
544 );
545 }
546 refresh_args
547 .staging_table
548 .write_chunk(key_chunk.clone());
549 refresh_args.staging_table.try_flush().await?;
550 }
551
552 yield Message::Chunk(chunk);
553 }
554 }
555 }
556 Message::Barrier(barrier) => {
557 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
558 barrier,
559 expect_next_state: Box::new(
560 MaterializeStreamState::NormalIngestion,
561 ),
562 };
563 continue 'main_loop;
564 }
565 }
566 }
567
568 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
569 anyhow::anyhow!(
570 "Input stream terminated unexpectedly during normal ingestion"
571 ),
572 )));
573 }
574 MaterializeStreamState::MergingData => {
575 let Some(refresh_args) = self.refresh_args.as_mut() else {
576 panic!(
577 "MaterializeExecutor entered CleanUp state without refresh_args configured"
578 );
579 };
580 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
581
582 debug_assert_eq!(
583 self.state_table.vnodes(),
584 refresh_args.staging_table.vnodes()
585 );
586 debug_assert_eq!(
587 refresh_args.staging_table.vnodes(),
588 refresh_args.progress_table.vnodes()
589 );
590
591 let mut rows_to_delete = vec![];
592 let mut merge_complete = false;
593 let mut pending_barrier: Option<Barrier> = None;
594
595 {
597 let left_input = input.by_ref().map(Either::Left);
598 let right_merge_sort = pin!(
599 Self::make_mergesort_stream(
600 &self.state_table,
601 &refresh_args.staging_table,
602 &mut refresh_args.progress_table
603 )
604 .map(Either::Right)
605 );
606
607 let mut merge_stream =
610 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
611 stream::PollNext::Left
612 });
613
614 #[for_await]
615 'merge_stream: for either in &mut merge_stream {
616 match either {
617 Either::Left(msg) => {
618 let msg = msg?;
619 match msg {
620 Message::Watermark(w) => yield Message::Watermark(w),
621 Message::Chunk(chunk) => {
622 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
623 }
624 Message::Barrier(b) => {
625 pending_barrier = Some(b);
626 break 'merge_stream;
627 }
628 }
629 }
630 Either::Right(result) => {
631 match result? {
632 Some((_vnode, row)) => {
633 rows_to_delete.push(row);
634 }
635 None => {
636 merge_complete = true;
638
639 }
641 }
642 }
643 }
644 }
645 }
646
647 for row in &rows_to_delete {
649 self.state_table.delete(row);
650 }
651 if !rows_to_delete.is_empty() {
652 let to_delete_chunk = StreamChunk::from_rows(
653 &rows_to_delete
654 .iter()
655 .map(|row| (Op::Delete, row))
656 .collect_vec(),
657 &self.schema.data_types(),
658 );
659
660 yield Message::Chunk(to_delete_chunk);
661 }
662
663 assert!(pending_barrier.is_some(), "pending barrier is not set");
665
666 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
667 barrier: pending_barrier.unwrap(),
668 expect_next_state: if merge_complete {
669 Box::new(MaterializeStreamState::CleanUp)
670 } else {
671 Box::new(MaterializeStreamState::MergingData)
672 },
673 };
674 continue 'main_loop;
675 }
676 MaterializeStreamState::CleanUp => {
677 let Some(refresh_args) = self.refresh_args.as_mut() else {
678 panic!(
679 "MaterializeExecutor entered MergingData state without refresh_args configured"
680 );
681 };
682 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
683
684 #[for_await]
685 for msg in input.by_ref() {
686 let msg = msg?;
687 match msg {
688 Message::Watermark(w) => yield Message::Watermark(w),
689 Message::Chunk(chunk) => {
690 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
691 }
692 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
693 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694 barrier,
695 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
696 };
697 continue 'main_loop;
698 }
699 Message::Barrier(barrier) => {
700 let staging_table_id = refresh_args.staging_table.table_id();
701 let epoch = barrier.epoch;
702 self.local_barrier_manager.report_refresh_finished(
703 epoch,
704 self.actor_context.id,
705 refresh_args.table_id,
706 staging_table_id,
707 );
708 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
709
710 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
711 barrier,
712 expect_next_state: Box::new(
713 MaterializeStreamState::RefreshEnd {
714 on_complete_epoch: epoch,
715 },
716 ),
717 };
718 continue 'main_loop;
719 }
720 }
721 }
722 }
723 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
724 let Some(refresh_args) = self.refresh_args.as_mut() else {
725 panic!(
726 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
727 );
728 };
729 let staging_table_id = refresh_args.staging_table.table_id();
730
731 let staging_store = refresh_args.staging_table.state_store().clone();
733 staging_store
734 .try_wait_epoch(
735 HummockReadEpoch::Committed(on_complete_epoch.prev),
736 TryWaitEpochOptions {
737 table_id: staging_table_id,
738 },
739 )
740 .await?;
741
742 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
743
744 if let Some(ref mut refresh_args) = self.refresh_args {
745 refresh_args.is_refreshing = false;
746 }
747 *inner_state = MaterializeStreamState::NormalIngestion;
748 continue 'main_loop;
749 }
750 MaterializeStreamState::CommitAndYieldBarrier {
751 barrier,
752 mut expect_next_state,
753 } => {
754 if let Some(ref mut refresh_args) = self.refresh_args {
755 match barrier.mutation.as_deref() {
756 Some(Mutation::RefreshStart {
757 table_id: refresh_table_id,
758 associated_source_id: _,
759 }) if *refresh_table_id == refresh_args.table_id => {
760 debug_assert!(
761 !refresh_args.is_refreshing,
762 "cannot start refresh twice"
763 );
764 refresh_args.is_refreshing = true;
765 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
766
767 Self::init_refresh_progress(
769 &self.state_table,
770 &mut refresh_args.progress_table,
771 barrier.epoch.curr,
772 )?;
773 }
774 Some(Mutation::LoadFinish {
775 associated_source_id: load_finish_source_id,
776 }) => {
777 let associated_source_id: SourceId = match refresh_args
779 .table_catalog
780 .optional_associated_source_id
781 {
782 Some(id) => id.into(),
783 None => unreachable!("associated_source_id is not set"),
784 };
785
786 if *load_finish_source_id == associated_source_id {
787 tracing::info!(
788 %load_finish_source_id,
789 "LoadFinish received, starting data replacement"
790 );
791 expect_next_state =
792 Box::new(MaterializeStreamState::<_>::MergingData);
793 }
794 }
795 _ => {}
796 }
797 }
798
799 if !self.may_have_downstream
803 && barrier.has_more_downstream_fragments(self.actor_context.id)
804 {
805 self.may_have_downstream = true;
806 }
807 Self::may_update_depended_subscriptions(
808 &mut self.subscriber_ids,
809 &barrier,
810 mv_table_id,
811 );
812 let op_consistency_level = get_op_consistency_level(
813 self.conflict_behavior,
814 self.may_have_downstream,
815 &self.subscriber_ids,
816 );
817 let post_commit = self
818 .state_table
819 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
820 .await?;
821 if !post_commit.inner().is_consistent_op() {
822 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
823 }
824
825 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
826
827 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
829 {
830 Some((
833 refresh_args.staging_table.commit(barrier.epoch).await?,
834 refresh_args.progress_table.commit(barrier.epoch).await?,
835 ))
836 } else {
837 None
838 };
839
840 let b_epoch = barrier.epoch;
841 yield Message::Barrier(barrier);
842
843 if let Some((_, cache_may_stale)) = post_commit
845 .post_yield_barrier(update_vnode_bitmap.clone())
846 .await?
847 && cache_may_stale
848 {
849 self.materialize_cache.lru_cache.clear();
850 }
851
852 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
854 staging_post_commit
855 .post_yield_barrier(update_vnode_bitmap.clone())
856 .await?;
857 progress_post_commit
858 .post_yield_barrier(update_vnode_bitmap)
859 .await?;
860 }
861
862 self.metrics
863 .materialize_current_epoch
864 .set(b_epoch.curr as i64);
865
866 *inner_state = *expect_next_state;
869 }
870 }
871 }
872 }
873
874 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
878 async fn make_mergesort_stream<'a>(
879 main_table: &'a StateTableInner<S, SD>,
880 staging_table: &'a StateTableInner<S, SD>,
881 progress_table: &'a mut RefreshProgressTable<S>,
882 ) {
883 for vnode in main_table.vnodes().clone().iter_vnodes() {
884 let mut processed_rows = 0;
885 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
887 if let Some(current_entry) = progress_table.get_progress(vnode) {
888 if current_entry.is_completed {
890 tracing::debug!(
891 vnode = vnode.to_index(),
892 "Skipping already completed VNode during recovery"
893 );
894 continue;
895 }
896 processed_rows += current_entry.processed_rows;
897 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
898
899 if let Some(current_state) = ¤t_entry.current_pos {
900 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
901 } else {
902 (Bound::Unbounded, Bound::Unbounded)
903 }
904 } else {
905 (Bound::Unbounded, Bound::Unbounded)
906 };
907
908 let iter_main = main_table
909 .iter_keyed_row_with_vnode(
910 vnode,
911 &pk_range,
912 PrefetchOptions::prefetch_for_large_range_scan(),
913 )
914 .await?;
915 let iter_staging = staging_table
916 .iter_keyed_row_with_vnode(
917 vnode,
918 &pk_range,
919 PrefetchOptions::prefetch_for_large_range_scan(),
920 )
921 .await?;
922
923 pin_mut!(iter_main);
924 pin_mut!(iter_staging);
925
926 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
928 let mut staging_item: Option<KeyedRow<Bytes>> =
929 iter_staging.next().await.transpose()?;
930
931 while let Some(main_kv) = main_item {
932 let main_key = main_kv.key();
933
934 let mut should_delete = false;
936 while let Some(staging_kv) = &staging_item {
937 let staging_key = staging_kv.key();
938 match main_key.cmp(staging_key) {
939 std::cmp::Ordering::Greater => {
940 staging_item = iter_staging.next().await.transpose()?;
942 }
943 std::cmp::Ordering::Equal => {
944 break;
946 }
947 std::cmp::Ordering::Less => {
948 should_delete = true;
950 break;
951 }
952 }
953 }
954
955 if staging_item.is_none() {
957 should_delete = true;
958 }
959
960 if should_delete {
961 yield Some((vnode, main_kv.row().clone()));
962 }
963
964 processed_rows += 1;
966 tracing::info!(
967 "set progress table: vnode = {:?}, processed_rows = {:?}",
968 vnode,
969 processed_rows
970 );
971 progress_table.set_progress(
972 vnode,
973 Some(
974 main_kv
975 .row()
976 .project(main_table.pk_indices())
977 .to_owned_row(),
978 ),
979 false,
980 processed_rows,
981 )?;
982 main_item = iter_main.next().await.transpose()?;
983 }
984
985 if let Some(current_entry) = progress_table.get_progress(vnode) {
987 progress_table.set_progress(
988 vnode,
989 current_entry.current_pos.clone(),
990 true, current_entry.processed_rows,
992 )?;
993
994 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
995 }
996 }
997
998 yield None;
1000 }
1001
1002 fn may_update_depended_subscriptions(
1004 depended_subscriptions: &mut HashSet<u32>,
1005 barrier: &Barrier,
1006 mv_table_id: TableId,
1007 ) {
1008 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1009 if !depended_subscriptions.insert(subscriber_id) {
1010 warn!(
1011 ?depended_subscriptions,
1012 ?mv_table_id,
1013 subscriber_id,
1014 "subscription id already exists"
1015 );
1016 }
1017 }
1018
1019 if let Some(Mutation::DropSubscriptions {
1020 subscriptions_to_drop,
1021 }) = barrier.mutation.as_deref()
1022 {
1023 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1024 if *upstream_mv_table_id == mv_table_id
1025 && !depended_subscriptions.remove(subscriber_id)
1026 {
1027 warn!(
1028 ?depended_subscriptions,
1029 ?mv_table_id,
1030 subscriber_id,
1031 "drop non existing subscriber_id id"
1032 );
1033 }
1034 }
1035 }
1036 }
1037
1038 fn init_refresh_progress(
1040 state_table: &StateTableInner<S, SD>,
1041 progress_table: &mut RefreshProgressTable<S>,
1042 _epoch: u64,
1043 ) -> StreamExecutorResult<()> {
1044 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1045
1046 for vnode in state_table.vnodes().iter_vnodes() {
1048 progress_table.set_progress(
1049 vnode, None, false, 0, )?;
1053 }
1054
1055 tracing::info!(
1056 vnodes_count = state_table.vnodes().count_ones(),
1057 "Initialized refresh progress tracking for all VNodes"
1058 );
1059
1060 Ok(())
1061 }
1062}
1063
1064impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1065 #[cfg(any(test, feature = "test"))]
1067 pub async fn for_test(
1068 input: Executor,
1069 store: S,
1070 table_id: TableId,
1071 keys: Vec<ColumnOrder>,
1072 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1073 watermark_epoch: AtomicU64Ref,
1074 conflict_behavior: ConflictBehavior,
1075 ) -> Self {
1076 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1077 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1078 let schema = input.schema().clone();
1079 let columns: Vec<ColumnDesc> = column_ids
1080 .into_iter()
1081 .zip_eq_fast(schema.fields.iter())
1082 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1083 .collect_vec();
1084
1085 let row_serde = BasicSerde::new(
1086 Arc::from((0..columns.len()).collect_vec()),
1087 Arc::from(columns.clone().into_boxed_slice()),
1088 );
1089 let state_table = StateTableInner::from_table_catalog(
1090 &crate::common::table::test_utils::gen_pbtable(
1091 table_id,
1092 columns,
1093 arrange_order_types,
1094 arrange_columns.clone(),
1095 0,
1096 ),
1097 store,
1098 None,
1099 )
1100 .await;
1101
1102 let metrics =
1103 StreamingMetrics::unused().new_materialize_metrics(table_id, 1.into(), 2.into());
1104
1105 Self {
1106 input,
1107 schema,
1108 state_table,
1109 arrange_key_indices: arrange_columns.clone(),
1110 actor_context: ActorContext::for_test(0),
1111 materialize_cache: MaterializeCache::new(
1112 watermark_epoch,
1113 MetricsInfo::for_test(),
1114 row_serde,
1115 vec![],
1116 ),
1117 conflict_behavior,
1118 version_column_indices: vec![],
1119 is_dummy_table: false,
1120 toastable_column_indices: None,
1121 may_have_downstream: true,
1122 subscriber_ids: HashSet::new(),
1123 metrics,
1124 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1126 }
1127 }
1128}
1129
1130fn is_unavailable_value_str(s: &str) -> bool {
1133 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1134}
1135
1136fn is_debezium_unavailable_value(
1139 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1140) -> bool {
1141 match datum {
1142 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1143 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1144 jsonb_ref
1146 .as_str()
1147 .map(is_unavailable_value_str)
1148 .unwrap_or(false)
1149 }
1150 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1151 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1155 is_unavailable_value_str(bytea_str)
1156 } else {
1157 false
1158 }
1159 }
1160 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1161 if list_ref.len() == 1 {
1165 if let Some(Some(element)) = list_ref.get(0) {
1166 is_debezium_unavailable_value(&Some(element))
1168 } else {
1169 false
1170 }
1171 } else {
1172 false
1173 }
1174 }
1175 _ => false,
1176 }
1177}
1178
1179fn handle_toast_columns_for_postgres_cdc(
1181 old_row: &OwnedRow,
1182 new_row: &OwnedRow,
1183 toastable_indices: &[usize],
1184) -> OwnedRow {
1185 let mut fixed_row_data = new_row.as_inner().to_vec();
1186
1187 for &toast_idx in toastable_indices {
1188 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1190 if is_unavailable {
1191 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1193 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1194 }
1195 }
1196 }
1197
1198 OwnedRow::new(fixed_row_data)
1199}
1200
1201type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
1202
1203impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1204 fn execute(self: Box<Self>) -> BoxedMessageStream {
1205 self.execute_inner().boxed()
1206 }
1207}
1208
1209impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211 f.debug_struct("MaterializeExecutor")
1212 .field("arrange_key_indices", &self.arrange_key_indices)
1213 .finish()
1214 }
1215}
1216
1217struct MaterializeCache<SD> {
1219 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1220 row_serde: BasicSerde,
1221 version_column_indices: Vec<u32>,
1222 _serde: PhantomData<SD>,
1223}
1224
1225type CacheValue = Option<CompactedRow>;
1226
1227impl<SD: ValueRowSerde> MaterializeCache<SD> {
1228 fn new(
1229 watermark_sequence: AtomicU64Ref,
1230 metrics_info: MetricsInfo,
1231 row_serde: BasicSerde,
1232 version_column_indices: Vec<u32>,
1233 ) -> Self {
1234 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1235 ManagedLruCache::unbounded(watermark_sequence, metrics_info);
1236 Self {
1237 lru_cache,
1238 row_serde,
1239 version_column_indices,
1240 _serde: PhantomData,
1241 }
1242 }
1243
1244 async fn handle<S: StateStore>(
1247 &mut self,
1248 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1249 table: &StateTableInner<S, SD>,
1250 conflict_behavior: ConflictBehavior,
1251 metrics: &MaterializeMetrics,
1252 toastable_column_indices: Option<&[usize]>,
1253 ) -> StreamExecutorResult<ChangeBuffer> {
1254 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1255
1256 let key_set: HashSet<Box<[u8]>> = row_ops
1257 .iter()
1258 .map(|(_, k, _)| k.as_slice().into())
1259 .collect();
1260
1261 self.fetch_keys(
1264 key_set.iter().map(|v| v.deref()),
1265 table,
1266 conflict_behavior,
1267 metrics,
1268 )
1269 .await?;
1270
1271 let mut change_buffer = ChangeBuffer::new();
1272 let row_serde = self.row_serde.clone();
1273 let version_column_indices = self.version_column_indices.clone();
1274 for (op, key, row) in row_ops {
1275 match op {
1276 Op::Insert | Op::UpdateInsert => {
1277 let Some(old_row) = self.get_expected(&key) else {
1278 let new_row_deserialized =
1280 row_serde.deserializer.deserialize(row.clone())?;
1281 change_buffer.insert(key.clone(), new_row_deserialized);
1282 self.lru_cache.put(key, Some(CompactedRow { row }));
1283 continue;
1284 };
1285
1286 match conflict_behavior {
1288 ConflictBehavior::Overwrite => {
1289 let old_row_deserialized =
1290 row_serde.deserializer.deserialize(old_row.row.clone())?;
1291 let new_row_deserialized =
1292 row_serde.deserializer.deserialize(row.clone())?;
1293
1294 let need_overwrite = if !version_column_indices.is_empty() {
1295 versions_are_newer_or_equal(
1296 &old_row_deserialized,
1297 &new_row_deserialized,
1298 &version_column_indices,
1299 )
1300 } else {
1301 true
1303 };
1304
1305 if need_overwrite {
1306 if let Some(toastable_indices) = toastable_column_indices {
1307 let final_row = handle_toast_columns_for_postgres_cdc(
1309 &old_row_deserialized,
1310 &new_row_deserialized,
1311 toastable_indices,
1312 );
1313
1314 change_buffer.update(
1315 key.clone(),
1316 old_row_deserialized,
1317 final_row.clone(),
1318 );
1319 let final_row_bytes =
1320 Bytes::from(row_serde.serializer.serialize(final_row));
1321 self.lru_cache.put(
1322 key.clone(),
1323 Some(CompactedRow {
1324 row: final_row_bytes,
1325 }),
1326 );
1327 } else {
1328 change_buffer.update(
1330 key.clone(),
1331 old_row_deserialized,
1332 new_row_deserialized,
1333 );
1334 self.lru_cache
1335 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1336 }
1337 };
1338 }
1339 ConflictBehavior::IgnoreConflict => {
1340 }
1342 ConflictBehavior::DoUpdateIfNotNull => {
1343 let old_row_deserialized =
1346 row_serde.deserializer.deserialize(old_row.row.clone())?;
1347 let new_row_deserialized =
1348 row_serde.deserializer.deserialize(row.clone())?;
1349 let need_overwrite = if !version_column_indices.is_empty() {
1350 versions_are_newer_or_equal(
1351 &old_row_deserialized,
1352 &new_row_deserialized,
1353 &version_column_indices,
1354 )
1355 } else {
1356 true
1357 };
1358
1359 if need_overwrite {
1360 let mut row_deserialized_vec =
1361 old_row_deserialized.clone().into_inner().into_vec();
1362 replace_if_not_null(
1363 &mut row_deserialized_vec,
1364 new_row_deserialized.clone(),
1365 );
1366 let mut updated_row = OwnedRow::new(row_deserialized_vec);
1367
1368 if let Some(toastable_indices) = toastable_column_indices {
1370 let old_row_deserialized_again =
1373 row_serde.deserializer.deserialize(old_row.row.clone())?;
1374 updated_row = handle_toast_columns_for_postgres_cdc(
1375 &old_row_deserialized_again,
1376 &updated_row,
1377 toastable_indices,
1378 );
1379 }
1380
1381 change_buffer.update(
1382 key.clone(),
1383 old_row_deserialized,
1384 updated_row.clone(),
1385 );
1386 let updated_row_bytes =
1387 Bytes::from(row_serde.serializer.serialize(updated_row));
1388 self.lru_cache.put(
1389 key.clone(),
1390 Some(CompactedRow {
1391 row: updated_row_bytes,
1392 }),
1393 );
1394 }
1395 }
1396 _ => unreachable!(),
1397 };
1398 }
1399
1400 Op::UpdateDelete
1401 if matches!(
1402 conflict_behavior,
1403 ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
1404 ) =>
1405 {
1406 }
1416
1417 Op::Delete | Op::UpdateDelete => {
1418 if let Some(old_row) = self.get_expected(&key) {
1419 let old_row_deserialized =
1420 row_serde.deserializer.deserialize(old_row.row.clone())?;
1421 change_buffer.delete(key.clone(), old_row_deserialized);
1422 self.lru_cache.put(key, None);
1424 } else {
1425 }
1428 }
1429 }
1430 }
1431 Ok(change_buffer)
1432 }
1433
1434 async fn fetch_keys<'a, S: StateStore>(
1435 &mut self,
1436 keys: impl Iterator<Item = &'a [u8]>,
1437 table: &StateTableInner<S, SD>,
1438 conflict_behavior: ConflictBehavior,
1439 metrics: &MaterializeMetrics,
1440 ) -> StreamExecutorResult<()> {
1441 let mut futures = vec![];
1442 for key in keys {
1443 metrics.materialize_cache_total_count.inc();
1444
1445 if self.lru_cache.contains(key) {
1446 if self.lru_cache.get(key).unwrap().is_some() {
1447 metrics.materialize_data_exist_count.inc();
1448 }
1449 metrics.materialize_cache_hit_count.inc();
1450 continue;
1451 }
1452 futures.push(async {
1453 let key_row = table.pk_serde().deserialize(key).unwrap();
1454 let row = table.get_row(key_row).await?.map(CompactedRow::from);
1455 StreamExecutorResult::Ok((key.to_vec(), row))
1456 });
1457 }
1458
1459 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1460 while let Some(result) = buffered.next().await {
1461 let (key, row) = result?;
1462 if row.is_some() {
1463 metrics.materialize_data_exist_count.inc();
1464 }
1465 match conflict_behavior {
1467 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1468 _ => unreachable!(),
1469 };
1470 }
1471
1472 Ok(())
1473 }
1474
1475 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1476 self.lru_cache.get(key).unwrap_or_else(|| {
1477 panic!(
1478 "the key {:?} has not been fetched in the materialize executor's cache ",
1479 key
1480 )
1481 })
1482 }
1483
1484 fn evict(&mut self) {
1485 self.lru_cache.evict()
1486 }
1487}
1488
1489fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1502 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1503 if let Some(new_value) = new_col {
1504 *old_col = Some(new_value);
1505 }
1506 }
1507}
1508
1509fn versions_are_newer_or_equal(
1512 old_row: &OwnedRow,
1513 new_row: &OwnedRow,
1514 version_column_indices: &[u32],
1515) -> bool {
1516 if version_column_indices.is_empty() {
1517 return true;
1519 }
1520
1521 for &idx in version_column_indices {
1522 let old_value = old_row.index(idx as usize);
1523 let new_value = new_row.index(idx as usize);
1524
1525 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1526 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
1530 }
1531
1532 true
1534}
1535
1536#[cfg(test)]
1537mod tests {
1538
1539 use std::iter;
1540 use std::sync::atomic::AtomicU64;
1541
1542 use rand::rngs::SmallRng;
1543 use rand::{Rng, RngCore, SeedableRng};
1544 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1545 use risingwave_common::catalog::Field;
1546 use risingwave_common::util::epoch::test_epoch;
1547 use risingwave_common::util::sort_util::OrderType;
1548 use risingwave_hummock_sdk::HummockReadEpoch;
1549 use risingwave_storage::memory::MemoryStateStore;
1550 use risingwave_storage::table::batch_table::BatchTable;
1551
1552 use super::*;
1553 use crate::executor::test_utils::*;
1554
1555 #[tokio::test]
1556 async fn test_materialize_executor() {
1557 let memory_state_store = MemoryStateStore::new();
1559 let table_id = TableId::new(1);
1560 let schema = Schema::new(vec![
1562 Field::unnamed(DataType::Int32),
1563 Field::unnamed(DataType::Int32),
1564 ]);
1565 let column_ids = vec![0.into(), 1.into()];
1566
1567 let chunk1 = StreamChunk::from_pretty(
1569 " i i
1570 + 1 4
1571 + 2 5
1572 + 3 6",
1573 );
1574 let chunk2 = StreamChunk::from_pretty(
1575 " i i
1576 + 7 8
1577 - 3 6",
1578 );
1579
1580 let source = MockSource::with_messages(vec![
1582 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1583 Message::Chunk(chunk1),
1584 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1585 Message::Chunk(chunk2),
1586 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1587 ])
1588 .into_executor(schema.clone(), StreamKey::new());
1589
1590 let order_types = vec![OrderType::ascending()];
1591 let column_descs = vec![
1592 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1593 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1594 ];
1595
1596 let table = BatchTable::for_test(
1597 memory_state_store.clone(),
1598 table_id,
1599 column_descs,
1600 order_types,
1601 vec![0],
1602 vec![0, 1],
1603 );
1604
1605 let mut materialize_executor = MaterializeExecutor::for_test(
1606 source,
1607 memory_state_store,
1608 table_id,
1609 vec![ColumnOrder::new(0, OrderType::ascending())],
1610 column_ids,
1611 Arc::new(AtomicU64::new(0)),
1612 ConflictBehavior::NoCheck,
1613 )
1614 .await
1615 .boxed()
1616 .execute();
1617 materialize_executor.next().await.transpose().unwrap();
1618
1619 materialize_executor.next().await.transpose().unwrap();
1620
1621 match materialize_executor.next().await.transpose().unwrap() {
1623 Some(Message::Barrier(_)) => {
1624 let row = table
1625 .get_row(
1626 &OwnedRow::new(vec![Some(3_i32.into())]),
1627 HummockReadEpoch::NoWait(u64::MAX),
1628 )
1629 .await
1630 .unwrap();
1631 assert_eq!(
1632 row,
1633 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1634 );
1635 }
1636 _ => unreachable!(),
1637 }
1638 materialize_executor.next().await.transpose().unwrap();
1639 match materialize_executor.next().await.transpose().unwrap() {
1641 Some(Message::Barrier(_)) => {
1642 let row = table
1643 .get_row(
1644 &OwnedRow::new(vec![Some(7_i32.into())]),
1645 HummockReadEpoch::NoWait(u64::MAX),
1646 )
1647 .await
1648 .unwrap();
1649 assert_eq!(
1650 row,
1651 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1652 );
1653 }
1654 _ => unreachable!(),
1655 }
1656 }
1657
1658 #[tokio::test]
1660 async fn test_upsert_stream() {
1661 let memory_state_store = MemoryStateStore::new();
1663 let table_id = TableId::new(1);
1664 let schema = Schema::new(vec![
1666 Field::unnamed(DataType::Int32),
1667 Field::unnamed(DataType::Int32),
1668 ]);
1669 let column_ids = vec![0.into(), 1.into()];
1670
1671 let chunk1 = StreamChunk::from_pretty(
1673 " i i
1674 + 1 1",
1675 );
1676
1677 let chunk2 = StreamChunk::from_pretty(
1678 " i i
1679 + 1 2
1680 - 1 2",
1681 );
1682
1683 let source = MockSource::with_messages(vec![
1685 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1686 Message::Chunk(chunk1),
1687 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1688 Message::Chunk(chunk2),
1689 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1690 ])
1691 .into_executor(schema.clone(), StreamKey::new());
1692
1693 let order_types = vec![OrderType::ascending()];
1694 let column_descs = vec![
1695 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1696 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1697 ];
1698
1699 let table = BatchTable::for_test(
1700 memory_state_store.clone(),
1701 table_id,
1702 column_descs,
1703 order_types,
1704 vec![0],
1705 vec![0, 1],
1706 );
1707
1708 let mut materialize_executor = MaterializeExecutor::for_test(
1709 source,
1710 memory_state_store,
1711 table_id,
1712 vec![ColumnOrder::new(0, OrderType::ascending())],
1713 column_ids,
1714 Arc::new(AtomicU64::new(0)),
1715 ConflictBehavior::Overwrite,
1716 )
1717 .await
1718 .boxed()
1719 .execute();
1720 materialize_executor.next().await.transpose().unwrap();
1721
1722 materialize_executor.next().await.transpose().unwrap();
1723 materialize_executor.next().await.transpose().unwrap();
1724 materialize_executor.next().await.transpose().unwrap();
1725
1726 match materialize_executor.next().await.transpose().unwrap() {
1727 Some(Message::Barrier(_)) => {
1728 let row = table
1729 .get_row(
1730 &OwnedRow::new(vec![Some(1_i32.into())]),
1731 HummockReadEpoch::NoWait(u64::MAX),
1732 )
1733 .await
1734 .unwrap();
1735 assert!(row.is_none());
1736 }
1737 _ => unreachable!(),
1738 }
1739 }
1740
1741 #[tokio::test]
1742 async fn test_check_insert_conflict() {
1743 let memory_state_store = MemoryStateStore::new();
1745 let table_id = TableId::new(1);
1746 let schema = Schema::new(vec![
1748 Field::unnamed(DataType::Int32),
1749 Field::unnamed(DataType::Int32),
1750 ]);
1751 let column_ids = vec![0.into(), 1.into()];
1752
1753 let chunk1 = StreamChunk::from_pretty(
1755 " i i
1756 + 1 3
1757 + 1 4
1758 + 2 5
1759 + 3 6",
1760 );
1761
1762 let chunk2 = StreamChunk::from_pretty(
1763 " i i
1764 + 1 3
1765 + 2 6",
1766 );
1767
1768 let chunk3 = StreamChunk::from_pretty(
1770 " i i
1771 + 1 4",
1772 );
1773
1774 let source = MockSource::with_messages(vec![
1776 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1777 Message::Chunk(chunk1),
1778 Message::Chunk(chunk2),
1779 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1780 Message::Chunk(chunk3),
1781 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1782 ])
1783 .into_executor(schema.clone(), StreamKey::new());
1784
1785 let order_types = vec![OrderType::ascending()];
1786 let column_descs = vec![
1787 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1788 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1789 ];
1790
1791 let table = BatchTable::for_test(
1792 memory_state_store.clone(),
1793 table_id,
1794 column_descs,
1795 order_types,
1796 vec![0],
1797 vec![0, 1],
1798 );
1799
1800 let mut materialize_executor = MaterializeExecutor::for_test(
1801 source,
1802 memory_state_store,
1803 table_id,
1804 vec![ColumnOrder::new(0, OrderType::ascending())],
1805 column_ids,
1806 Arc::new(AtomicU64::new(0)),
1807 ConflictBehavior::Overwrite,
1808 )
1809 .await
1810 .boxed()
1811 .execute();
1812 materialize_executor.next().await.transpose().unwrap();
1813
1814 materialize_executor.next().await.transpose().unwrap();
1815 materialize_executor.next().await.transpose().unwrap();
1816
1817 match materialize_executor.next().await.transpose().unwrap() {
1819 Some(Message::Barrier(_)) => {
1820 let row = table
1821 .get_row(
1822 &OwnedRow::new(vec![Some(3_i32.into())]),
1823 HummockReadEpoch::NoWait(u64::MAX),
1824 )
1825 .await
1826 .unwrap();
1827 assert_eq!(
1828 row,
1829 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1830 );
1831
1832 let row = table
1833 .get_row(
1834 &OwnedRow::new(vec![Some(1_i32.into())]),
1835 HummockReadEpoch::NoWait(u64::MAX),
1836 )
1837 .await
1838 .unwrap();
1839 assert_eq!(
1840 row,
1841 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1842 );
1843
1844 let row = table
1845 .get_row(
1846 &OwnedRow::new(vec![Some(2_i32.into())]),
1847 HummockReadEpoch::NoWait(u64::MAX),
1848 )
1849 .await
1850 .unwrap();
1851 assert_eq!(
1852 row,
1853 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1854 );
1855 }
1856 _ => unreachable!(),
1857 }
1858 }
1859
1860 #[tokio::test]
1861 async fn test_delete_and_update_conflict() {
1862 let memory_state_store = MemoryStateStore::new();
1864 let table_id = TableId::new(1);
1865 let schema = Schema::new(vec![
1867 Field::unnamed(DataType::Int32),
1868 Field::unnamed(DataType::Int32),
1869 ]);
1870 let column_ids = vec![0.into(), 1.into()];
1871
1872 let chunk1 = StreamChunk::from_pretty(
1874 " i i
1875 + 1 4
1876 + 2 5
1877 + 3 6
1878 U- 8 1
1879 U+ 8 2
1880 + 8 3",
1881 );
1882
1883 let chunk2 = StreamChunk::from_pretty(
1885 " i i
1886 + 7 8
1887 - 3 4
1888 - 5 0",
1889 );
1890
1891 let chunk3 = StreamChunk::from_pretty(
1893 " i i
1894 + 1 5
1895 U- 2 4
1896 U+ 2 8
1897 U- 9 0
1898 U+ 9 1",
1899 );
1900
1901 let source = MockSource::with_messages(vec![
1903 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1904 Message::Chunk(chunk1),
1905 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1906 Message::Chunk(chunk2),
1907 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1908 Message::Chunk(chunk3),
1909 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1910 ])
1911 .into_executor(schema.clone(), StreamKey::new());
1912
1913 let order_types = vec![OrderType::ascending()];
1914 let column_descs = vec![
1915 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1916 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1917 ];
1918
1919 let table = BatchTable::for_test(
1920 memory_state_store.clone(),
1921 table_id,
1922 column_descs,
1923 order_types,
1924 vec![0],
1925 vec![0, 1],
1926 );
1927
1928 let mut materialize_executor = MaterializeExecutor::for_test(
1929 source,
1930 memory_state_store,
1931 table_id,
1932 vec![ColumnOrder::new(0, OrderType::ascending())],
1933 column_ids,
1934 Arc::new(AtomicU64::new(0)),
1935 ConflictBehavior::Overwrite,
1936 )
1937 .await
1938 .boxed()
1939 .execute();
1940 materialize_executor.next().await.transpose().unwrap();
1941
1942 materialize_executor.next().await.transpose().unwrap();
1943
1944 match materialize_executor.next().await.transpose().unwrap() {
1946 Some(Message::Barrier(_)) => {
1947 let row = table
1949 .get_row(
1950 &OwnedRow::new(vec![Some(8_i32.into())]),
1951 HummockReadEpoch::NoWait(u64::MAX),
1952 )
1953 .await
1954 .unwrap();
1955 assert_eq!(
1956 row,
1957 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1958 );
1959 }
1960 _ => unreachable!(),
1961 }
1962 materialize_executor.next().await.transpose().unwrap();
1963
1964 match materialize_executor.next().await.transpose().unwrap() {
1965 Some(Message::Barrier(_)) => {
1966 let row = table
1967 .get_row(
1968 &OwnedRow::new(vec![Some(7_i32.into())]),
1969 HummockReadEpoch::NoWait(u64::MAX),
1970 )
1971 .await
1972 .unwrap();
1973 assert_eq!(
1974 row,
1975 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1976 );
1977
1978 let row = table
1980 .get_row(
1981 &OwnedRow::new(vec![Some(3_i32.into())]),
1982 HummockReadEpoch::NoWait(u64::MAX),
1983 )
1984 .await
1985 .unwrap();
1986 assert_eq!(row, None);
1987
1988 let row = table
1990 .get_row(
1991 &OwnedRow::new(vec![Some(5_i32.into())]),
1992 HummockReadEpoch::NoWait(u64::MAX),
1993 )
1994 .await
1995 .unwrap();
1996 assert_eq!(row, None);
1997 }
1998 _ => unreachable!(),
1999 }
2000
2001 materialize_executor.next().await.transpose().unwrap();
2002 match materialize_executor.next().await.transpose().unwrap() {
2004 Some(Message::Barrier(_)) => {
2005 let row = table
2006 .get_row(
2007 &OwnedRow::new(vec![Some(1_i32.into())]),
2008 HummockReadEpoch::NoWait(u64::MAX),
2009 )
2010 .await
2011 .unwrap();
2012 assert_eq!(
2013 row,
2014 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
2015 );
2016
2017 let row = table
2019 .get_row(
2020 &OwnedRow::new(vec![Some(2_i32.into())]),
2021 HummockReadEpoch::NoWait(u64::MAX),
2022 )
2023 .await
2024 .unwrap();
2025 assert_eq!(
2026 row,
2027 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2028 );
2029
2030 let row = table
2032 .get_row(
2033 &OwnedRow::new(vec![Some(9_i32.into())]),
2034 HummockReadEpoch::NoWait(u64::MAX),
2035 )
2036 .await
2037 .unwrap();
2038 assert_eq!(
2039 row,
2040 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2041 );
2042 }
2043 _ => unreachable!(),
2044 }
2045 }
2046
2047 #[tokio::test]
2048 async fn test_ignore_insert_conflict() {
2049 let memory_state_store = MemoryStateStore::new();
2051 let table_id = TableId::new(1);
2052 let schema = Schema::new(vec![
2054 Field::unnamed(DataType::Int32),
2055 Field::unnamed(DataType::Int32),
2056 ]);
2057 let column_ids = vec![0.into(), 1.into()];
2058
2059 let chunk1 = StreamChunk::from_pretty(
2061 " i i
2062 + 1 3
2063 + 1 4
2064 + 2 5
2065 + 3 6",
2066 );
2067
2068 let chunk2 = StreamChunk::from_pretty(
2069 " i i
2070 + 1 5
2071 + 2 6",
2072 );
2073
2074 let chunk3 = StreamChunk::from_pretty(
2076 " i i
2077 + 1 6",
2078 );
2079
2080 let source = MockSource::with_messages(vec![
2082 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2083 Message::Chunk(chunk1),
2084 Message::Chunk(chunk2),
2085 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2086 Message::Chunk(chunk3),
2087 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2088 ])
2089 .into_executor(schema.clone(), StreamKey::new());
2090
2091 let order_types = vec![OrderType::ascending()];
2092 let column_descs = vec![
2093 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2094 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2095 ];
2096
2097 let table = BatchTable::for_test(
2098 memory_state_store.clone(),
2099 table_id,
2100 column_descs,
2101 order_types,
2102 vec![0],
2103 vec![0, 1],
2104 );
2105
2106 let mut materialize_executor = MaterializeExecutor::for_test(
2107 source,
2108 memory_state_store,
2109 table_id,
2110 vec![ColumnOrder::new(0, OrderType::ascending())],
2111 column_ids,
2112 Arc::new(AtomicU64::new(0)),
2113 ConflictBehavior::IgnoreConflict,
2114 )
2115 .await
2116 .boxed()
2117 .execute();
2118 materialize_executor.next().await.transpose().unwrap();
2119
2120 materialize_executor.next().await.transpose().unwrap();
2121 materialize_executor.next().await.transpose().unwrap();
2122
2123 match materialize_executor.next().await.transpose().unwrap() {
2125 Some(Message::Barrier(_)) => {
2126 let row = table
2127 .get_row(
2128 &OwnedRow::new(vec![Some(3_i32.into())]),
2129 HummockReadEpoch::NoWait(u64::MAX),
2130 )
2131 .await
2132 .unwrap();
2133 assert_eq!(
2134 row,
2135 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2136 );
2137
2138 let row = table
2139 .get_row(
2140 &OwnedRow::new(vec![Some(1_i32.into())]),
2141 HummockReadEpoch::NoWait(u64::MAX),
2142 )
2143 .await
2144 .unwrap();
2145 assert_eq!(
2146 row,
2147 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2148 );
2149
2150 let row = table
2151 .get_row(
2152 &OwnedRow::new(vec![Some(2_i32.into())]),
2153 HummockReadEpoch::NoWait(u64::MAX),
2154 )
2155 .await
2156 .unwrap();
2157 assert_eq!(
2158 row,
2159 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2160 );
2161 }
2162 _ => unreachable!(),
2163 }
2164 }
2165
2166 #[tokio::test]
2167 async fn test_ignore_delete_then_insert() {
2168 let memory_state_store = MemoryStateStore::new();
2170 let table_id = TableId::new(1);
2171 let schema = Schema::new(vec![
2173 Field::unnamed(DataType::Int32),
2174 Field::unnamed(DataType::Int32),
2175 ]);
2176 let column_ids = vec![0.into(), 1.into()];
2177
2178 let chunk1 = StreamChunk::from_pretty(
2180 " i i
2181 + 1 3
2182 - 1 3
2183 + 1 6",
2184 );
2185
2186 let source = MockSource::with_messages(vec![
2188 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2189 Message::Chunk(chunk1),
2190 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2191 ])
2192 .into_executor(schema.clone(), StreamKey::new());
2193
2194 let order_types = vec![OrderType::ascending()];
2195 let column_descs = vec![
2196 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2197 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2198 ];
2199
2200 let table = BatchTable::for_test(
2201 memory_state_store.clone(),
2202 table_id,
2203 column_descs,
2204 order_types,
2205 vec![0],
2206 vec![0, 1],
2207 );
2208
2209 let mut materialize_executor = MaterializeExecutor::for_test(
2210 source,
2211 memory_state_store,
2212 table_id,
2213 vec![ColumnOrder::new(0, OrderType::ascending())],
2214 column_ids,
2215 Arc::new(AtomicU64::new(0)),
2216 ConflictBehavior::IgnoreConflict,
2217 )
2218 .await
2219 .boxed()
2220 .execute();
2221 let _msg1 = materialize_executor
2222 .next()
2223 .await
2224 .transpose()
2225 .unwrap()
2226 .unwrap()
2227 .as_barrier()
2228 .unwrap();
2229 let _msg2 = materialize_executor
2230 .next()
2231 .await
2232 .transpose()
2233 .unwrap()
2234 .unwrap()
2235 .as_chunk()
2236 .unwrap();
2237 let _msg3 = materialize_executor
2238 .next()
2239 .await
2240 .transpose()
2241 .unwrap()
2242 .unwrap()
2243 .as_barrier()
2244 .unwrap();
2245
2246 let row = table
2247 .get_row(
2248 &OwnedRow::new(vec![Some(1_i32.into())]),
2249 HummockReadEpoch::NoWait(u64::MAX),
2250 )
2251 .await
2252 .unwrap();
2253 assert_eq!(
2254 row,
2255 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2256 );
2257 }
2258
2259 #[tokio::test]
2260 async fn test_ignore_delete_and_update_conflict() {
2261 let memory_state_store = MemoryStateStore::new();
2263 let table_id = TableId::new(1);
2264 let schema = Schema::new(vec![
2266 Field::unnamed(DataType::Int32),
2267 Field::unnamed(DataType::Int32),
2268 ]);
2269 let column_ids = vec![0.into(), 1.into()];
2270
2271 let chunk1 = StreamChunk::from_pretty(
2273 " i i
2274 + 1 4
2275 + 2 5
2276 + 3 6
2277 U- 8 1
2278 U+ 8 2
2279 + 8 3",
2280 );
2281
2282 let chunk2 = StreamChunk::from_pretty(
2284 " i i
2285 + 7 8
2286 - 3 4
2287 - 5 0",
2288 );
2289
2290 let chunk3 = StreamChunk::from_pretty(
2292 " i i
2293 + 1 5
2294 U- 2 4
2295 U+ 2 8
2296 U- 9 0
2297 U+ 9 1",
2298 );
2299
2300 let source = MockSource::with_messages(vec![
2302 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2303 Message::Chunk(chunk1),
2304 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2305 Message::Chunk(chunk2),
2306 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2307 Message::Chunk(chunk3),
2308 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2309 ])
2310 .into_executor(schema.clone(), StreamKey::new());
2311
2312 let order_types = vec![OrderType::ascending()];
2313 let column_descs = vec![
2314 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2315 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2316 ];
2317
2318 let table = BatchTable::for_test(
2319 memory_state_store.clone(),
2320 table_id,
2321 column_descs,
2322 order_types,
2323 vec![0],
2324 vec![0, 1],
2325 );
2326
2327 let mut materialize_executor = MaterializeExecutor::for_test(
2328 source,
2329 memory_state_store,
2330 table_id,
2331 vec![ColumnOrder::new(0, OrderType::ascending())],
2332 column_ids,
2333 Arc::new(AtomicU64::new(0)),
2334 ConflictBehavior::IgnoreConflict,
2335 )
2336 .await
2337 .boxed()
2338 .execute();
2339 materialize_executor.next().await.transpose().unwrap();
2340
2341 materialize_executor.next().await.transpose().unwrap();
2342
2343 match materialize_executor.next().await.transpose().unwrap() {
2345 Some(Message::Barrier(_)) => {
2346 let row = table
2348 .get_row(
2349 &OwnedRow::new(vec![Some(8_i32.into())]),
2350 HummockReadEpoch::NoWait(u64::MAX),
2351 )
2352 .await
2353 .unwrap();
2354 assert_eq!(
2355 row,
2356 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2357 );
2358 }
2359 _ => unreachable!(),
2360 }
2361 materialize_executor.next().await.transpose().unwrap();
2362
2363 match materialize_executor.next().await.transpose().unwrap() {
2364 Some(Message::Barrier(_)) => {
2365 let row = table
2366 .get_row(
2367 &OwnedRow::new(vec![Some(7_i32.into())]),
2368 HummockReadEpoch::NoWait(u64::MAX),
2369 )
2370 .await
2371 .unwrap();
2372 assert_eq!(
2373 row,
2374 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2375 );
2376
2377 let row = table
2379 .get_row(
2380 &OwnedRow::new(vec![Some(3_i32.into())]),
2381 HummockReadEpoch::NoWait(u64::MAX),
2382 )
2383 .await
2384 .unwrap();
2385 assert_eq!(row, None);
2386
2387 let row = table
2389 .get_row(
2390 &OwnedRow::new(vec![Some(5_i32.into())]),
2391 HummockReadEpoch::NoWait(u64::MAX),
2392 )
2393 .await
2394 .unwrap();
2395 assert_eq!(row, None);
2396 }
2397 _ => unreachable!(),
2398 }
2399
2400 materialize_executor.next().await.transpose().unwrap();
2401 match materialize_executor.next().await.transpose().unwrap() {
2404 Some(Message::Barrier(_)) => {
2405 let row = table
2406 .get_row(
2407 &OwnedRow::new(vec![Some(1_i32.into())]),
2408 HummockReadEpoch::NoWait(u64::MAX),
2409 )
2410 .await
2411 .unwrap();
2412 assert_eq!(
2413 row,
2414 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2415 );
2416
2417 let row = table
2419 .get_row(
2420 &OwnedRow::new(vec![Some(2_i32.into())]),
2421 HummockReadEpoch::NoWait(u64::MAX),
2422 )
2423 .await
2424 .unwrap();
2425 assert_eq!(
2426 row,
2427 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2428 );
2429
2430 let row = table
2432 .get_row(
2433 &OwnedRow::new(vec![Some(9_i32.into())]),
2434 HummockReadEpoch::NoWait(u64::MAX),
2435 )
2436 .await
2437 .unwrap();
2438 assert_eq!(
2439 row,
2440 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2441 );
2442 }
2443 _ => unreachable!(),
2444 }
2445 }
2446
2447 #[tokio::test]
2448 async fn test_do_update_if_not_null_conflict() {
2449 let memory_state_store = MemoryStateStore::new();
2451 let table_id = TableId::new(1);
2452 let schema = Schema::new(vec![
2454 Field::unnamed(DataType::Int32),
2455 Field::unnamed(DataType::Int32),
2456 ]);
2457 let column_ids = vec![0.into(), 1.into()];
2458
2459 let chunk1 = StreamChunk::from_pretty(
2461 " i i
2462 + 1 4
2463 + 2 .
2464 + 3 6
2465 U- 8 .
2466 U+ 8 2
2467 + 8 .",
2468 );
2469
2470 let chunk2 = StreamChunk::from_pretty(
2472 " i i
2473 + 7 8
2474 - 3 4
2475 - 5 0",
2476 );
2477
2478 let chunk3 = StreamChunk::from_pretty(
2480 " i i
2481 + 1 5
2482 + 7 .
2483 U- 2 4
2484 U+ 2 .
2485 U- 9 0
2486 U+ 9 1",
2487 );
2488
2489 let source = MockSource::with_messages(vec![
2491 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2492 Message::Chunk(chunk1),
2493 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2494 Message::Chunk(chunk2),
2495 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2496 Message::Chunk(chunk3),
2497 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2498 ])
2499 .into_executor(schema.clone(), StreamKey::new());
2500
2501 let order_types = vec![OrderType::ascending()];
2502 let column_descs = vec![
2503 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2504 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2505 ];
2506
2507 let table = BatchTable::for_test(
2508 memory_state_store.clone(),
2509 table_id,
2510 column_descs,
2511 order_types,
2512 vec![0],
2513 vec![0, 1],
2514 );
2515
2516 let mut materialize_executor = MaterializeExecutor::for_test(
2517 source,
2518 memory_state_store,
2519 table_id,
2520 vec![ColumnOrder::new(0, OrderType::ascending())],
2521 column_ids,
2522 Arc::new(AtomicU64::new(0)),
2523 ConflictBehavior::DoUpdateIfNotNull,
2524 )
2525 .await
2526 .boxed()
2527 .execute();
2528 materialize_executor.next().await.transpose().unwrap();
2529
2530 materialize_executor.next().await.transpose().unwrap();
2531
2532 match materialize_executor.next().await.transpose().unwrap() {
2534 Some(Message::Barrier(_)) => {
2535 let row = table
2536 .get_row(
2537 &OwnedRow::new(vec![Some(8_i32.into())]),
2538 HummockReadEpoch::NoWait(u64::MAX),
2539 )
2540 .await
2541 .unwrap();
2542 assert_eq!(
2543 row,
2544 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2545 );
2546
2547 let row = table
2548 .get_row(
2549 &OwnedRow::new(vec![Some(2_i32.into())]),
2550 HummockReadEpoch::NoWait(u64::MAX),
2551 )
2552 .await
2553 .unwrap();
2554 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2555 }
2556 _ => unreachable!(),
2557 }
2558 materialize_executor.next().await.transpose().unwrap();
2559
2560 match materialize_executor.next().await.transpose().unwrap() {
2561 Some(Message::Barrier(_)) => {
2562 let row = table
2563 .get_row(
2564 &OwnedRow::new(vec![Some(7_i32.into())]),
2565 HummockReadEpoch::NoWait(u64::MAX),
2566 )
2567 .await
2568 .unwrap();
2569 assert_eq!(
2570 row,
2571 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2572 );
2573
2574 let row = table
2576 .get_row(
2577 &OwnedRow::new(vec![Some(3_i32.into())]),
2578 HummockReadEpoch::NoWait(u64::MAX),
2579 )
2580 .await
2581 .unwrap();
2582 assert_eq!(row, None);
2583
2584 let row = table
2586 .get_row(
2587 &OwnedRow::new(vec![Some(5_i32.into())]),
2588 HummockReadEpoch::NoWait(u64::MAX),
2589 )
2590 .await
2591 .unwrap();
2592 assert_eq!(row, None);
2593 }
2594 _ => unreachable!(),
2595 }
2596
2597 materialize_executor.next().await.transpose().unwrap();
2598 match materialize_executor.next().await.transpose().unwrap() {
2601 Some(Message::Barrier(_)) => {
2602 let row = table
2603 .get_row(
2604 &OwnedRow::new(vec![Some(7_i32.into())]),
2605 HummockReadEpoch::NoWait(u64::MAX),
2606 )
2607 .await
2608 .unwrap();
2609 assert_eq!(
2610 row,
2611 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2612 );
2613
2614 let row = table
2616 .get_row(
2617 &OwnedRow::new(vec![Some(2_i32.into())]),
2618 HummockReadEpoch::NoWait(u64::MAX),
2619 )
2620 .await
2621 .unwrap();
2622 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2623
2624 let row = table
2626 .get_row(
2627 &OwnedRow::new(vec![Some(9_i32.into())]),
2628 HummockReadEpoch::NoWait(u64::MAX),
2629 )
2630 .await
2631 .unwrap();
2632 assert_eq!(
2633 row,
2634 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2635 );
2636 }
2637 _ => unreachable!(),
2638 }
2639 }
2640
2641 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2642 const KN: u32 = 4;
2643 const SEED: u64 = 998244353;
2644 let mut ret = vec![];
2645 let mut builder =
2646 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2647 let mut rng = SmallRng::seed_from_u64(SEED);
2648
2649 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2650 let len = c.data_chunk().capacity();
2651 let mut c = StreamChunkMut::from(c);
2652 for i in 0..len {
2653 c.set_vis(i, rng.random_bool(0.5));
2654 }
2655 c.into()
2656 };
2657 for _ in 0..row_number {
2658 let k = (rng.next_u32() % KN) as i32;
2659 let v = rng.next_u32() as i32;
2660 let op = if rng.random_bool(0.5) {
2661 Op::Insert
2662 } else {
2663 Op::Delete
2664 };
2665 if let Some(c) =
2666 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2667 {
2668 ret.push(random_vis(c, &mut rng));
2669 }
2670 }
2671 if let Some(c) = builder.take() {
2672 ret.push(random_vis(c, &mut rng));
2673 }
2674 ret
2675 }
2676
2677 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2678 const N: usize = 100000;
2679
2680 let memory_state_store = MemoryStateStore::new();
2682 let table_id = TableId::new(1);
2683 let schema = Schema::new(vec![
2685 Field::unnamed(DataType::Int32),
2686 Field::unnamed(DataType::Int32),
2687 ]);
2688 let column_ids = vec![0.into(), 1.into()];
2689
2690 let chunks = gen_fuzz_data(N, 128);
2691 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2692 .chain(chunks.into_iter().map(Message::Chunk))
2693 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2694 test_epoch(2),
2695 ))))
2696 .collect();
2697 let source =
2699 MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2700
2701 let mut materialize_executor = MaterializeExecutor::for_test(
2702 source,
2703 memory_state_store.clone(),
2704 table_id,
2705 vec![ColumnOrder::new(0, OrderType::ascending())],
2706 column_ids,
2707 Arc::new(AtomicU64::new(0)),
2708 conflict_behavior,
2709 )
2710 .await
2711 .boxed()
2712 .execute();
2713 materialize_executor.expect_barrier().await;
2714
2715 let order_types = vec![OrderType::ascending()];
2716 let column_descs = vec![
2717 ColumnDesc::unnamed(0.into(), DataType::Int32),
2718 ColumnDesc::unnamed(1.into(), DataType::Int32),
2719 ];
2720 let pk_indices = vec![0];
2721
2722 let mut table = StateTable::from_table_catalog(
2723 &crate::common::table::test_utils::gen_pbtable(
2724 TableId::from(1002),
2725 column_descs.clone(),
2726 order_types,
2727 pk_indices,
2728 0,
2729 ),
2730 memory_state_store.clone(),
2731 None,
2732 )
2733 .await;
2734
2735 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2736 table.write_chunk(c);
2738 }
2739 }
2740
2741 #[tokio::test]
2742 async fn fuzz_test_stream_consistent_upsert() {
2743 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2744 }
2745
2746 #[tokio::test]
2747 async fn fuzz_test_stream_consistent_ignore() {
2748 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2749 }
2750}