1use std::assert_matches::assert_matches;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::ops::{Bound, Deref, Index};
19
20use bytes::Bytes;
21use futures::future::Either;
22use futures::stream::{self, select_with_strategy};
23use futures_async_stream::try_stream;
24use itertools::Itertools;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{
28 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
29};
30use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
31use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
32use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
33use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
35use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::catalog::Table;
38use risingwave_pb::catalog::table::{Engine, OptionalAssociatedSourceId};
39use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
40use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
41use risingwave_storage::table::KeyedRow;
42
43use crate::cache::ManagedLruCache;
44use crate::common::metrics::MetricsInfo;
45use crate::common::table::state_table::{
46 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
47};
48use crate::executor::error::ErrorKind;
49use crate::executor::monitor::MaterializeMetrics;
50use crate::executor::mview::RefreshProgressTable;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57 NormalIngestion,
58 MergingData,
59 CleanUp,
60 CommitAndYieldBarrier {
61 barrier: BarrierInner<M>,
62 expect_next_state: Box<MaterializeStreamState<M>>,
63 },
64 RefreshEnd {
65 on_complete_epoch: EpochPair,
66 },
67}
68
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71 input: Executor,
72
73 schema: Schema,
74
75 state_table: StateTableInner<S, SD>,
76
77 arrange_key_indices: Vec<usize>,
79
80 actor_context: ActorContextRef,
81
82 materialize_cache: MaterializeCache<SD>,
83
84 conflict_behavior: ConflictBehavior,
85
86 version_column_indices: Vec<u32>,
87
88 may_have_downstream: bool,
89
90 subscriber_ids: HashSet<u32>,
91
92 metrics: MaterializeMetrics,
93
94 is_dummy_table: bool,
97
98 toastable_column_indices: Option<Vec<usize>>,
100
101 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
103
104 local_barrier_manager: LocalBarrierManager,
106}
107
108pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
110 pub table_catalog: Table,
112
113 pub staging_table_catalog: Table,
115
116 pub is_refreshing: bool,
118
119 pub staging_table: StateTableInner<S, SD>,
126
127 pub progress_table: RefreshProgressTable<S>,
129
130 pub table_id: TableId,
132}
133
134impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
135 pub async fn new(
137 store: S,
138 table_catalog: &Table,
139 staging_table_catalog: &Table,
140 progress_state_table: &Table,
141 vnodes: Option<Arc<Bitmap>>,
142 ) -> Self {
143 let table_id = TableId::new(table_catalog.id);
144
145 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
147 staging_table_catalog,
148 store.clone(),
149 vnodes.clone(),
150 )
151 .await;
152
153 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
154 progress_state_table,
155 store,
156 vnodes,
157 )
158 .await;
159
160 let pk_len = table_catalog.pk.len();
162 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
163
164 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
165
166 Self {
167 table_catalog: table_catalog.clone(),
168 staging_table_catalog: staging_table_catalog.clone(),
169 is_refreshing: false,
170 staging_table,
171 progress_table,
172 table_id,
173 }
174 }
175}
176
177fn get_op_consistency_level(
178 conflict_behavior: ConflictBehavior,
179 may_have_downstream: bool,
180 subscriber_ids: &HashSet<u32>,
181) -> StateTableOpConsistencyLevel {
182 if !subscriber_ids.is_empty() {
183 StateTableOpConsistencyLevel::LogStoreEnabled
184 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
185 StateTableOpConsistencyLevel::Inconsistent
188 } else {
189 StateTableOpConsistencyLevel::ConsistentOldValue
190 }
191}
192
193impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
194 #[allow(clippy::too_many_arguments)]
198 pub async fn new(
199 input: Executor,
200 schema: Schema,
201 store: S,
202 arrange_key: Vec<ColumnOrder>,
203 actor_context: ActorContextRef,
204 vnodes: Option<Arc<Bitmap>>,
205 table_catalog: &Table,
206 watermark_epoch: AtomicU64Ref,
207 conflict_behavior: ConflictBehavior,
208 version_column_indices: Vec<u32>,
209 metrics: Arc<StreamingMetrics>,
210 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
211 local_barrier_manager: LocalBarrierManager,
212 ) -> Self {
213 let table_columns: Vec<ColumnDesc> = table_catalog
214 .columns
215 .iter()
216 .map(|col| col.column_desc.as_ref().unwrap().into())
217 .collect();
218
219 let toastable_column_indices = if table_catalog.cdc_table_type()
222 == risingwave_pb::catalog::table::CdcTableType::Postgres
223 {
224 let toastable_indices: Vec<usize> = table_columns
225 .iter()
226 .enumerate()
227 .filter_map(|(index, column)| match &column.data_type {
228 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
237 Some(index)
238 }
239 _ => None,
240 })
241 .collect();
242
243 if toastable_indices.is_empty() {
244 None
245 } else {
246 Some(toastable_indices)
247 }
248 } else {
249 None
250 };
251
252 let row_serde: BasicSerde = BasicSerde::new(
253 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
254 Arc::from(table_columns.into_boxed_slice()),
255 );
256
257 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
258 let may_have_downstream = actor_context.initial_dispatch_num != 0;
259 let subscriber_ids = actor_context.initial_subscriber_ids.clone();
260 let op_consistency_level =
261 get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
262 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
264 .with_op_consistency_level(op_consistency_level)
265 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
266 .build()
267 .await;
268
269 let mv_metrics = metrics.new_materialize_metrics(
270 TableId::new(table_catalog.id),
271 actor_context.id,
272 actor_context.fragment_id,
273 );
274
275 let metrics_info =
276 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
277
278 let is_dummy_table =
279 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
280
281 Self {
282 input,
283 schema,
284 state_table,
285 arrange_key_indices,
286 actor_context,
287 materialize_cache: MaterializeCache::new(
288 watermark_epoch,
289 metrics_info,
290 row_serde,
291 version_column_indices.clone(),
292 ),
293 conflict_behavior,
294 version_column_indices,
295 is_dummy_table,
296 may_have_downstream,
297 subscriber_ids,
298 metrics: mv_metrics,
299 toastable_column_indices,
300 refresh_args,
301 local_barrier_manager,
302 }
303 }
304
305 #[try_stream(ok = Message, error = StreamExecutorError)]
306 async fn execute_inner(mut self) {
307 let mv_table_id = TableId::new(self.state_table.table_id());
308 let _staging_table_id = TableId::new(self.state_table.table_id());
309 let data_types = self.schema.data_types();
310 let mut input = self.input.execute();
311
312 let barrier = expect_first_barrier(&mut input).await?;
313 let first_epoch = barrier.epoch;
314 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
317 self.state_table.init_epoch(first_epoch).await?;
318
319 let mut inner_state =
321 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
322 if let Some(ref mut refresh_args) = self.refresh_args {
324 refresh_args.staging_table.init_epoch(first_epoch).await?;
325
326 refresh_args.progress_table.recover(first_epoch).await?;
328
329 let progress_stats = refresh_args.progress_table.get_progress_stats();
331 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
332 refresh_args.is_refreshing = true;
333 tracing::info!(
334 total_vnodes = progress_stats.total_vnodes,
335 completed_vnodes = progress_stats.completed_vnodes,
336 "Recovered refresh in progress, resuming refresh operation"
337 );
338
339 let incomplete_vnodes: Vec<_> = refresh_args
343 .progress_table
344 .get_all_progress()
345 .iter()
346 .filter(|(_, entry)| !entry.is_completed)
347 .map(|(&vnode, _)| vnode)
348 .collect();
349
350 if !incomplete_vnodes.is_empty() {
351 tracing::info!(
353 incomplete_vnodes = incomplete_vnodes.len(),
354 "Recovery detected incomplete VNodes, resuming refresh operation"
355 );
356 } else {
359 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
361 }
362 }
363 }
364
365 if let Some(ref refresh_args) = self.refresh_args
367 && refresh_args.is_refreshing
368 {
369 let incomplete_vnodes: Vec<_> = refresh_args
371 .progress_table
372 .get_all_progress()
373 .iter()
374 .filter(|(_, entry)| !entry.is_completed)
375 .map(|(&vnode, _)| vnode)
376 .collect();
377 if !incomplete_vnodes.is_empty() {
378 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
380 tracing::info!(
381 incomplete_vnodes = incomplete_vnodes.len(),
382 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
383 );
384 }
385 }
386
387 'main_loop: loop {
389 match *inner_state {
390 MaterializeStreamState::NormalIngestion => {
391 #[for_await]
392 '_normal_ingest: for msg in input.by_ref() {
393 let msg = msg?;
394 self.materialize_cache.evict();
395
396 match msg {
397 Message::Watermark(w) => {
398 yield Message::Watermark(w);
399 }
400 Message::Chunk(chunk) if self.is_dummy_table => {
401 self.metrics
402 .materialize_input_row_count
403 .inc_by(chunk.cardinality() as u64);
404 yield Message::Chunk(chunk);
405 }
406 Message::Chunk(chunk) => {
407 self.metrics
408 .materialize_input_row_count
409 .inc_by(chunk.cardinality() as u64);
410 let do_not_handle_conflict = !self.state_table.is_consistent_op()
414 && self.version_column_indices.is_empty()
415 && self.conflict_behavior == ConflictBehavior::Overwrite;
416
417 match self.conflict_behavior {
418 checked_conflict_behaviors!() if !do_not_handle_conflict => {
419 if chunk.cardinality() == 0 {
420 continue;
422 }
423 let (data_chunk, ops) = chunk.clone().into_parts();
424
425 if self.state_table.value_indices().is_some() {
426 panic!(
429 "materialize executor with data check can not handle only materialize partial columns"
430 )
431 };
432 let values = data_chunk.serialize();
433
434 let key_chunk =
435 data_chunk.project(self.state_table.pk_indices());
436
437 if let Some(ref mut refresh_args) = self.refresh_args
440 && refresh_args.is_refreshing
441 {
442 let key_chunk = chunk
443 .clone()
444 .project(self.state_table.pk_indices());
445 tracing::trace!(
446 staging_chunk = %key_chunk.to_pretty(),
447 input_chunk = %chunk.to_pretty(),
448 "writing to staging table"
449 );
450 if cfg!(debug_assertions) {
451 assert!(
453 key_chunk
454 .ops()
455 .iter()
456 .all(|op| op == &Op::Insert)
457 );
458 }
459 refresh_args
460 .staging_table
461 .write_chunk(key_chunk.clone());
462 refresh_args.staging_table.try_flush().await?;
463 }
464
465 let pks = {
466 let mut pks = vec![vec![]; data_chunk.capacity()];
467 key_chunk
468 .rows_with_holes()
469 .zip_eq_fast(pks.iter_mut())
470 .for_each(|(r, vnode_and_pk)| {
471 if let Some(r) = r {
472 self.state_table
473 .pk_serde()
474 .serialize(r, vnode_and_pk);
475 }
476 });
477 pks
478 };
479 let (_, vis) = key_chunk.into_parts();
480 let row_ops = ops
481 .iter()
482 .zip_eq_debug(pks.into_iter())
483 .zip_eq_debug(values.into_iter())
484 .zip_eq_debug(vis.iter())
485 .filter_map(|(((op, k), v), vis)| {
486 vis.then_some((*op, k, v))
487 })
488 .collect_vec();
489
490 let change_buffer = self
491 .materialize_cache
492 .handle(
493 row_ops,
494 &self.state_table,
495 self.conflict_behavior,
496 &self.metrics,
497 self.toastable_column_indices.as_deref(),
498 )
499 .await?;
500
501 match change_buffer.into_chunk(data_types.clone()) {
502 Some(output_chunk) => {
503 self.state_table.write_chunk(output_chunk.clone());
504 self.state_table.try_flush().await?;
505 yield Message::Chunk(output_chunk);
506 }
507 None => continue,
508 }
509 }
510 ConflictBehavior::IgnoreConflict => unreachable!(),
511 ConflictBehavior::NoCheck
512 | ConflictBehavior::Overwrite
513 | ConflictBehavior::DoUpdateIfNotNull => {
514 self.state_table.write_chunk(chunk.clone());
515 self.state_table.try_flush().await?;
516
517 if let Some(ref mut refresh_args) = self.refresh_args
519 && refresh_args.is_refreshing
520 {
521 let key_chunk = chunk
522 .clone()
523 .project(self.state_table.pk_indices());
524 tracing::trace!(
525 staging_chunk = %key_chunk.to_pretty(),
526 input_chunk = %chunk.to_pretty(),
527 "writing to staging table"
528 );
529 if cfg!(debug_assertions) {
530 assert!(
532 key_chunk
533 .ops()
534 .iter()
535 .all(|op| op == &Op::Insert)
536 );
537 }
538 refresh_args
539 .staging_table
540 .write_chunk(key_chunk.clone());
541 refresh_args.staging_table.try_flush().await?;
542 }
543
544 yield Message::Chunk(chunk);
545 }
546 }
547 }
548 Message::Barrier(barrier) => {
549 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
550 barrier,
551 expect_next_state: Box::new(
552 MaterializeStreamState::NormalIngestion,
553 ),
554 };
555 continue 'main_loop;
556 }
557 }
558 }
559
560 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
561 anyhow::anyhow!(
562 "Input stream terminated unexpectedly during normal ingestion"
563 ),
564 )));
565 }
566 MaterializeStreamState::MergingData => {
567 let Some(refresh_args) = self.refresh_args.as_mut() else {
568 panic!(
569 "MaterializeExecutor entered CleanUp state without refresh_args configured"
570 );
571 };
572 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
573
574 debug_assert_eq!(
575 self.state_table.vnodes(),
576 refresh_args.staging_table.vnodes()
577 );
578 debug_assert_eq!(
579 refresh_args.staging_table.vnodes(),
580 refresh_args.progress_table.vnodes()
581 );
582
583 let mut rows_to_delete = vec![];
584 let mut merge_complete = false;
585 let mut pending_barrier: Option<Barrier> = None;
586
587 {
589 let left_input = input.by_ref().map(Either::Left);
590 let right_merge_sort = pin!(
591 Self::make_mergesort_stream(
592 &self.state_table,
593 &refresh_args.staging_table,
594 &mut refresh_args.progress_table
595 )
596 .map(Either::Right)
597 );
598
599 let mut merge_stream =
602 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
603 stream::PollNext::Left
604 });
605
606 #[for_await]
607 'merge_stream: for either in &mut merge_stream {
608 match either {
609 Either::Left(msg) => {
610 let msg = msg?;
611 match msg {
612 Message::Watermark(w) => yield Message::Watermark(w),
613 Message::Chunk(chunk) => {
614 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
615 }
616 Message::Barrier(b) => {
617 pending_barrier = Some(b);
618 break 'merge_stream;
619 }
620 }
621 }
622 Either::Right(result) => {
623 match result? {
624 Some((_vnode, row)) => {
625 rows_to_delete.push(row);
626 }
627 None => {
628 merge_complete = true;
630
631 }
633 }
634 }
635 }
636 }
637 }
638
639 for row in &rows_to_delete {
641 self.state_table.delete(row);
642 }
643 if !rows_to_delete.is_empty() {
644 let to_delete_chunk = StreamChunk::from_rows(
645 &rows_to_delete
646 .iter()
647 .map(|row| (Op::Delete, row))
648 .collect_vec(),
649 &self.schema.data_types(),
650 );
651
652 tracing::debug!(table_id = %refresh_args.table_id, "yielding to delete chunk: {}", to_delete_chunk.to_pretty());
653 yield Message::Chunk(to_delete_chunk);
654 }
655
656 assert!(pending_barrier.is_some(), "pending barrier is not set");
658
659 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
660 barrier: pending_barrier.unwrap(),
661 expect_next_state: if merge_complete {
662 Box::new(MaterializeStreamState::CleanUp)
663 } else {
664 Box::new(MaterializeStreamState::MergingData)
665 },
666 };
667 continue 'main_loop;
668 }
669 MaterializeStreamState::CleanUp => {
670 let Some(refresh_args) = self.refresh_args.as_mut() else {
671 panic!(
672 "MaterializeExecutor entered MergingData state without refresh_args configured"
673 );
674 };
675 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
676
677 #[for_await]
678 for msg in input.by_ref() {
679 let msg = msg?;
680 match msg {
681 Message::Watermark(w) => yield Message::Watermark(w),
682 Message::Chunk(chunk) => {
683 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
684 }
685 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
686 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
687 barrier,
688 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
689 };
690 continue 'main_loop;
691 }
692 Message::Barrier(barrier) => {
693 let staging_table_id = refresh_args.staging_table.table_id();
694 let epoch = barrier.epoch;
695 self.local_barrier_manager.report_refresh_finished(
696 epoch,
697 self.actor_context.id,
698 refresh_args.table_id.into(),
699 staging_table_id,
700 );
701 tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
702
703 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
704 barrier,
705 expect_next_state: Box::new(
706 MaterializeStreamState::RefreshEnd {
707 on_complete_epoch: epoch,
708 },
709 ),
710 };
711 continue 'main_loop;
712 }
713 }
714 }
715 }
716 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
717 let Some(refresh_args) = self.refresh_args.as_mut() else {
718 panic!(
719 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
720 );
721 };
722 let staging_table_id = refresh_args.staging_table.table_id();
723
724 let staging_store = refresh_args.staging_table.state_store().clone();
726 staging_store
727 .try_wait_epoch(
728 HummockReadEpoch::Committed(on_complete_epoch.prev),
729 TryWaitEpochOptions {
730 table_id: staging_table_id.into(),
731 },
732 )
733 .await?;
734
735 tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
736
737 if let Some(ref mut refresh_args) = self.refresh_args {
738 refresh_args.is_refreshing = false;
739 }
740 *inner_state = MaterializeStreamState::NormalIngestion;
741 continue 'main_loop;
742 }
743 MaterializeStreamState::CommitAndYieldBarrier {
744 barrier,
745 mut expect_next_state,
746 } => {
747 if let Some(ref mut refresh_args) = self.refresh_args {
748 match barrier.mutation.as_deref() {
749 Some(Mutation::RefreshStart {
750 table_id: refresh_table_id,
751 associated_source_id: _,
752 }) if *refresh_table_id == refresh_args.table_id => {
753 debug_assert!(
754 !refresh_args.is_refreshing,
755 "cannot start refresh twice"
756 );
757 refresh_args.is_refreshing = true;
758 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
759
760 Self::init_refresh_progress(
762 &self.state_table,
763 &mut refresh_args.progress_table,
764 barrier.epoch.curr,
765 )?;
766 }
767 Some(Mutation::LoadFinish {
768 associated_source_id: load_finish_source_id,
769 }) => {
770 let associated_source_id = match refresh_args
772 .table_catalog
773 .optional_associated_source_id
774 {
775 Some(OptionalAssociatedSourceId::AssociatedSourceId(id)) => id,
776 None => unreachable!("associated_source_id is not set"),
777 };
778
779 if load_finish_source_id.table_id() == associated_source_id {
780 tracing::info!(
781 %load_finish_source_id,
782 "LoadFinish received, starting data replacement"
783 );
784 expect_next_state =
785 Box::new(MaterializeStreamState::<_>::MergingData);
786 }
787 }
788 _ => {}
789 }
790 }
791
792 if !self.may_have_downstream
796 && barrier.has_more_downstream_fragments(self.actor_context.id)
797 {
798 self.may_have_downstream = true;
799 }
800 Self::may_update_depended_subscriptions(
801 &mut self.subscriber_ids,
802 &barrier,
803 mv_table_id,
804 );
805 let op_consistency_level = get_op_consistency_level(
806 self.conflict_behavior,
807 self.may_have_downstream,
808 &self.subscriber_ids,
809 );
810 let post_commit = self
811 .state_table
812 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
813 .await?;
814 if !post_commit.inner().is_consistent_op() {
815 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
816 }
817
818 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
819
820 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
822 {
823 Some((
826 refresh_args.staging_table.commit(barrier.epoch).await?,
827 refresh_args.progress_table.commit(barrier.epoch).await?,
828 ))
829 } else {
830 None
831 };
832
833 let b_epoch = barrier.epoch;
834 yield Message::Barrier(barrier);
835
836 if let Some((_, cache_may_stale)) = post_commit
838 .post_yield_barrier(update_vnode_bitmap.clone())
839 .await?
840 && cache_may_stale
841 {
842 self.materialize_cache.lru_cache.clear();
843 }
844
845 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
847 staging_post_commit
848 .post_yield_barrier(update_vnode_bitmap.clone())
849 .await?;
850 progress_post_commit
851 .post_yield_barrier(update_vnode_bitmap)
852 .await?;
853 }
854
855 self.metrics
856 .materialize_current_epoch
857 .set(b_epoch.curr as i64);
858
859 *inner_state = *expect_next_state;
862 }
863 }
864 }
865 }
866
867 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
871 async fn make_mergesort_stream<'a>(
872 main_table: &'a StateTableInner<S, SD>,
873 staging_table: &'a StateTableInner<S, SD>,
874 progress_table: &'a mut RefreshProgressTable<S>,
875 ) {
876 for vnode in main_table.vnodes().clone().iter_vnodes() {
877 let mut processed_rows = 0;
878 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
880 if let Some(current_entry) = progress_table.get_progress(vnode) {
881 if current_entry.is_completed {
883 tracing::debug!(
884 vnode = vnode.to_index(),
885 "Skipping already completed VNode during recovery"
886 );
887 continue;
888 }
889 processed_rows += current_entry.processed_rows;
890 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
891
892 if let Some(current_state) = ¤t_entry.current_pos {
893 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
894 } else {
895 (Bound::Unbounded, Bound::Unbounded)
896 }
897 } else {
898 (Bound::Unbounded, Bound::Unbounded)
899 };
900
901 let iter_main = main_table
902 .iter_keyed_row_with_vnode(
903 vnode,
904 &pk_range,
905 PrefetchOptions::prefetch_for_large_range_scan(),
906 )
907 .await?;
908 let iter_staging = staging_table
909 .iter_keyed_row_with_vnode(
910 vnode,
911 &pk_range,
912 PrefetchOptions::prefetch_for_large_range_scan(),
913 )
914 .await?;
915
916 pin_mut!(iter_main);
917 pin_mut!(iter_staging);
918
919 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
921 let mut staging_item: Option<KeyedRow<Bytes>> =
922 iter_staging.next().await.transpose()?;
923
924 while let Some(main_kv) = main_item {
925 let main_key = main_kv.key();
926
927 let mut should_delete = false;
929 while let Some(staging_kv) = &staging_item {
930 let staging_key = staging_kv.key();
931 match main_key.cmp(staging_key) {
932 std::cmp::Ordering::Greater => {
933 staging_item = iter_staging.next().await.transpose()?;
935 }
936 std::cmp::Ordering::Equal => {
937 break;
939 }
940 std::cmp::Ordering::Less => {
941 should_delete = true;
943 break;
944 }
945 }
946 }
947
948 if staging_item.is_none() {
950 should_delete = true;
951 }
952
953 if should_delete {
954 yield Some((vnode, main_kv.row().clone()));
955 }
956
957 processed_rows += 1;
959 tracing::info!(
960 "set progress table: vnode = {:?}, processed_rows = {:?}",
961 vnode,
962 processed_rows
963 );
964 progress_table.set_progress(
965 vnode,
966 Some(
967 main_kv
968 .row()
969 .project(main_table.pk_indices())
970 .to_owned_row(),
971 ),
972 false,
973 processed_rows,
974 )?;
975 main_item = iter_main.next().await.transpose()?;
976 }
977
978 if let Some(current_entry) = progress_table.get_progress(vnode) {
980 progress_table.set_progress(
981 vnode,
982 current_entry.current_pos.clone(),
983 true, current_entry.processed_rows,
985 )?;
986
987 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
988 }
989 }
990
991 yield None;
993 }
994
995 fn may_update_depended_subscriptions(
997 depended_subscriptions: &mut HashSet<u32>,
998 barrier: &Barrier,
999 mv_table_id: TableId,
1000 ) {
1001 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1002 if !depended_subscriptions.insert(subscriber_id) {
1003 warn!(
1004 ?depended_subscriptions,
1005 ?mv_table_id,
1006 subscriber_id,
1007 "subscription id already exists"
1008 );
1009 }
1010 }
1011
1012 if let Some(Mutation::DropSubscriptions {
1013 subscriptions_to_drop,
1014 }) = barrier.mutation.as_deref()
1015 {
1016 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1017 if *upstream_mv_table_id == mv_table_id
1018 && !depended_subscriptions.remove(subscriber_id)
1019 {
1020 warn!(
1021 ?depended_subscriptions,
1022 ?mv_table_id,
1023 subscriber_id,
1024 "drop non existing subscriber_id id"
1025 );
1026 }
1027 }
1028 }
1029 }
1030
1031 fn init_refresh_progress(
1033 state_table: &StateTableInner<S, SD>,
1034 progress_table: &mut RefreshProgressTable<S>,
1035 _epoch: u64,
1036 ) -> StreamExecutorResult<()> {
1037 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1038
1039 for vnode in state_table.vnodes().iter_vnodes() {
1041 progress_table.set_progress(
1042 vnode, None, false, 0, )?;
1046 }
1047
1048 tracing::info!(
1049 vnodes_count = state_table.vnodes().count_ones(),
1050 "Initialized refresh progress tracking for all VNodes"
1051 );
1052
1053 Ok(())
1054 }
1055}
1056
1057impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1058 #[cfg(any(test, feature = "test"))]
1060 pub async fn for_test(
1061 input: Executor,
1062 store: S,
1063 table_id: TableId,
1064 keys: Vec<ColumnOrder>,
1065 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1066 watermark_epoch: AtomicU64Ref,
1067 conflict_behavior: ConflictBehavior,
1068 ) -> Self {
1069 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1070 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1071 let schema = input.schema().clone();
1072 let columns: Vec<ColumnDesc> = column_ids
1073 .into_iter()
1074 .zip_eq_fast(schema.fields.iter())
1075 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1076 .collect_vec();
1077
1078 let row_serde = BasicSerde::new(
1079 Arc::from((0..columns.len()).collect_vec()),
1080 Arc::from(columns.clone().into_boxed_slice()),
1081 );
1082 let state_table = StateTableInner::from_table_catalog(
1083 &crate::common::table::test_utils::gen_pbtable(
1084 table_id,
1085 columns,
1086 arrange_order_types,
1087 arrange_columns.clone(),
1088 0,
1089 ),
1090 store,
1091 None,
1092 )
1093 .await;
1094
1095 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
1096
1097 Self {
1098 input,
1099 schema,
1100 state_table,
1101 arrange_key_indices: arrange_columns.clone(),
1102 actor_context: ActorContext::for_test(0),
1103 materialize_cache: MaterializeCache::new(
1104 watermark_epoch,
1105 MetricsInfo::for_test(),
1106 row_serde,
1107 vec![],
1108 ),
1109 conflict_behavior,
1110 version_column_indices: vec![],
1111 is_dummy_table: false,
1112 toastable_column_indices: None,
1113 may_have_downstream: true,
1114 subscriber_ids: HashSet::new(),
1115 metrics,
1116 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1118 }
1119 }
1120}
1121
1122fn is_unavailable_value_str(s: &str) -> bool {
1125 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1126}
1127
1128fn is_debezium_unavailable_value(
1131 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1132) -> bool {
1133 match datum {
1134 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1135 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1136 jsonb_ref
1138 .as_str()
1139 .map(is_unavailable_value_str)
1140 .unwrap_or(false)
1141 }
1142 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1143 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1147 is_unavailable_value_str(bytea_str)
1148 } else {
1149 false
1150 }
1151 }
1152 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1153 if list_ref.len() == 1 {
1157 if let Some(Some(element)) = list_ref.get(0) {
1158 is_debezium_unavailable_value(&Some(element))
1160 } else {
1161 false
1162 }
1163 } else {
1164 false
1165 }
1166 }
1167 _ => false,
1168 }
1169}
1170
1171fn handle_toast_columns_for_postgres_cdc(
1173 old_row: &OwnedRow,
1174 new_row: &OwnedRow,
1175 toastable_indices: &[usize],
1176) -> OwnedRow {
1177 let mut fixed_row_data = new_row.as_inner().to_vec();
1178
1179 for &toast_idx in toastable_indices {
1180 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1182 if is_unavailable {
1183 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1185 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1186 }
1187 }
1188 }
1189
1190 OwnedRow::new(fixed_row_data)
1191}
1192
1193type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
1194
1195impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1196 fn execute(self: Box<Self>) -> BoxedMessageStream {
1197 self.execute_inner().boxed()
1198 }
1199}
1200
1201impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1203 f.debug_struct("MaterializeExecutor")
1204 .field("arrange_key_indices", &self.arrange_key_indices)
1205 .finish()
1206 }
1207}
1208
1209struct MaterializeCache<SD> {
1211 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1212 row_serde: BasicSerde,
1213 version_column_indices: Vec<u32>,
1214 _serde: PhantomData<SD>,
1215}
1216
1217type CacheValue = Option<CompactedRow>;
1218
1219impl<SD: ValueRowSerde> MaterializeCache<SD> {
1220 fn new(
1221 watermark_sequence: AtomicU64Ref,
1222 metrics_info: MetricsInfo,
1223 row_serde: BasicSerde,
1224 version_column_indices: Vec<u32>,
1225 ) -> Self {
1226 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1227 ManagedLruCache::unbounded(watermark_sequence, metrics_info);
1228 Self {
1229 lru_cache,
1230 row_serde,
1231 version_column_indices,
1232 _serde: PhantomData,
1233 }
1234 }
1235
1236 async fn handle<S: StateStore>(
1239 &mut self,
1240 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1241 table: &StateTableInner<S, SD>,
1242 conflict_behavior: ConflictBehavior,
1243 metrics: &MaterializeMetrics,
1244 toastable_column_indices: Option<&[usize]>,
1245 ) -> StreamExecutorResult<ChangeBuffer> {
1246 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1247
1248 let key_set: HashSet<Box<[u8]>> = row_ops
1249 .iter()
1250 .map(|(_, k, _)| k.as_slice().into())
1251 .collect();
1252
1253 self.fetch_keys(
1256 key_set.iter().map(|v| v.deref()),
1257 table,
1258 conflict_behavior,
1259 metrics,
1260 )
1261 .await?;
1262
1263 let mut change_buffer = ChangeBuffer::new();
1264 let row_serde = self.row_serde.clone();
1265 let version_column_indices = self.version_column_indices.clone();
1266 for (op, key, row) in row_ops {
1267 match op {
1268 Op::Insert | Op::UpdateInsert => {
1269 let Some(old_row) = self.get_expected(&key) else {
1270 let new_row_deserialized =
1272 row_serde.deserializer.deserialize(row.clone())?;
1273 change_buffer.insert(key.clone(), new_row_deserialized);
1274 self.lru_cache.put(key, Some(CompactedRow { row }));
1275 continue;
1276 };
1277
1278 match conflict_behavior {
1280 ConflictBehavior::Overwrite => {
1281 let old_row_deserialized =
1282 row_serde.deserializer.deserialize(old_row.row.clone())?;
1283 let new_row_deserialized =
1284 row_serde.deserializer.deserialize(row.clone())?;
1285
1286 let need_overwrite = if !version_column_indices.is_empty() {
1287 versions_are_newer_or_equal(
1288 &old_row_deserialized,
1289 &new_row_deserialized,
1290 &version_column_indices,
1291 )
1292 } else {
1293 true
1295 };
1296
1297 if need_overwrite {
1298 if let Some(toastable_indices) = toastable_column_indices {
1299 let final_row = handle_toast_columns_for_postgres_cdc(
1301 &old_row_deserialized,
1302 &new_row_deserialized,
1303 toastable_indices,
1304 );
1305
1306 change_buffer.update(
1307 key.clone(),
1308 old_row_deserialized,
1309 final_row.clone(),
1310 );
1311 let final_row_bytes =
1312 Bytes::from(row_serde.serializer.serialize(final_row));
1313 self.lru_cache.put(
1314 key.clone(),
1315 Some(CompactedRow {
1316 row: final_row_bytes,
1317 }),
1318 );
1319 } else {
1320 change_buffer.update(
1322 key.clone(),
1323 old_row_deserialized,
1324 new_row_deserialized,
1325 );
1326 self.lru_cache
1327 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1328 }
1329 };
1330 }
1331 ConflictBehavior::IgnoreConflict => {
1332 }
1334 ConflictBehavior::DoUpdateIfNotNull => {
1335 let old_row_deserialized =
1338 row_serde.deserializer.deserialize(old_row.row.clone())?;
1339 let new_row_deserialized =
1340 row_serde.deserializer.deserialize(row.clone())?;
1341 let need_overwrite = if !version_column_indices.is_empty() {
1342 versions_are_newer_or_equal(
1343 &old_row_deserialized,
1344 &new_row_deserialized,
1345 &version_column_indices,
1346 )
1347 } else {
1348 true
1349 };
1350
1351 if need_overwrite {
1352 let mut row_deserialized_vec =
1353 old_row_deserialized.clone().into_inner().into_vec();
1354 replace_if_not_null(
1355 &mut row_deserialized_vec,
1356 new_row_deserialized.clone(),
1357 );
1358 let mut updated_row = OwnedRow::new(row_deserialized_vec);
1359
1360 if let Some(toastable_indices) = toastable_column_indices {
1362 let old_row_deserialized_again =
1365 row_serde.deserializer.deserialize(old_row.row.clone())?;
1366 updated_row = handle_toast_columns_for_postgres_cdc(
1367 &old_row_deserialized_again,
1368 &updated_row,
1369 toastable_indices,
1370 );
1371 }
1372
1373 change_buffer.update(
1374 key.clone(),
1375 old_row_deserialized,
1376 updated_row.clone(),
1377 );
1378 let updated_row_bytes =
1379 Bytes::from(row_serde.serializer.serialize(updated_row));
1380 self.lru_cache.put(
1381 key.clone(),
1382 Some(CompactedRow {
1383 row: updated_row_bytes,
1384 }),
1385 );
1386 }
1387 }
1388 _ => unreachable!(),
1389 };
1390 }
1391
1392 Op::Delete | Op::UpdateDelete => {
1393 match conflict_behavior {
1394 checked_conflict_behaviors!() => {
1395 if let Some(old_row) = self.get_expected(&key) {
1396 let old_row_deserialized =
1397 row_serde.deserializer.deserialize(old_row.row.clone())?;
1398 change_buffer.delete(key.clone(), old_row_deserialized);
1399 self.lru_cache.put(key, None);
1401 } else {
1402 };
1405 }
1406 _ => unreachable!(),
1407 };
1408 }
1409 }
1410 }
1411 Ok(change_buffer)
1412 }
1413
1414 async fn fetch_keys<'a, S: StateStore>(
1415 &mut self,
1416 keys: impl Iterator<Item = &'a [u8]>,
1417 table: &StateTableInner<S, SD>,
1418 conflict_behavior: ConflictBehavior,
1419 metrics: &MaterializeMetrics,
1420 ) -> StreamExecutorResult<()> {
1421 let mut futures = vec![];
1422 for key in keys {
1423 metrics.materialize_cache_total_count.inc();
1424
1425 if self.lru_cache.contains(key) {
1426 if self.lru_cache.get(key).unwrap().is_some() {
1427 metrics.materialize_data_exist_count.inc();
1428 }
1429 metrics.materialize_cache_hit_count.inc();
1430 continue;
1431 }
1432 futures.push(async {
1433 let key_row = table.pk_serde().deserialize(key).unwrap();
1434 let row = table.get_row(key_row).await?.map(CompactedRow::from);
1435 StreamExecutorResult::Ok((key.to_vec(), row))
1436 });
1437 }
1438
1439 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1440 while let Some(result) = buffered.next().await {
1441 let (key, row) = result?;
1442 if row.is_some() {
1443 metrics.materialize_data_exist_count.inc();
1444 }
1445 match conflict_behavior {
1447 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1448 _ => unreachable!(),
1449 };
1450 }
1451
1452 Ok(())
1453 }
1454
1455 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1456 self.lru_cache.get(key).unwrap_or_else(|| {
1457 panic!(
1458 "the key {:?} has not been fetched in the materialize executor's cache ",
1459 key
1460 )
1461 })
1462 }
1463
1464 fn evict(&mut self) {
1465 self.lru_cache.evict()
1466 }
1467}
1468
1469fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1482 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1483 if let Some(new_value) = new_col {
1484 *old_col = Some(new_value);
1485 }
1486 }
1487}
1488
1489fn versions_are_newer_or_equal(
1492 old_row: &OwnedRow,
1493 new_row: &OwnedRow,
1494 version_column_indices: &[u32],
1495) -> bool {
1496 if version_column_indices.is_empty() {
1497 return true;
1499 }
1500
1501 for &idx in version_column_indices {
1502 let old_value = old_row.index(idx as usize);
1503 let new_value = new_row.index(idx as usize);
1504
1505 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1506 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
1510 }
1511
1512 true
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518
1519 use std::iter;
1520 use std::sync::atomic::AtomicU64;
1521
1522 use rand::rngs::SmallRng;
1523 use rand::{Rng, RngCore, SeedableRng};
1524 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1525 use risingwave_common::catalog::Field;
1526 use risingwave_common::util::epoch::test_epoch;
1527 use risingwave_common::util::sort_util::OrderType;
1528 use risingwave_hummock_sdk::HummockReadEpoch;
1529 use risingwave_storage::memory::MemoryStateStore;
1530 use risingwave_storage::table::batch_table::BatchTable;
1531
1532 use super::*;
1533 use crate::executor::test_utils::*;
1534
1535 #[tokio::test]
1536 async fn test_materialize_executor() {
1537 let memory_state_store = MemoryStateStore::new();
1539 let table_id = TableId::new(1);
1540 let schema = Schema::new(vec![
1542 Field::unnamed(DataType::Int32),
1543 Field::unnamed(DataType::Int32),
1544 ]);
1545 let column_ids = vec![0.into(), 1.into()];
1546
1547 let chunk1 = StreamChunk::from_pretty(
1549 " i i
1550 + 1 4
1551 + 2 5
1552 + 3 6",
1553 );
1554 let chunk2 = StreamChunk::from_pretty(
1555 " i i
1556 + 7 8
1557 - 3 6",
1558 );
1559
1560 let source = MockSource::with_messages(vec![
1562 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1563 Message::Chunk(chunk1),
1564 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1565 Message::Chunk(chunk2),
1566 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1567 ])
1568 .into_executor(schema.clone(), PkIndices::new());
1569
1570 let order_types = vec![OrderType::ascending()];
1571 let column_descs = vec![
1572 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1573 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1574 ];
1575
1576 let table = BatchTable::for_test(
1577 memory_state_store.clone(),
1578 table_id,
1579 column_descs,
1580 order_types,
1581 vec![0],
1582 vec![0, 1],
1583 );
1584
1585 let mut materialize_executor = MaterializeExecutor::for_test(
1586 source,
1587 memory_state_store,
1588 table_id,
1589 vec![ColumnOrder::new(0, OrderType::ascending())],
1590 column_ids,
1591 Arc::new(AtomicU64::new(0)),
1592 ConflictBehavior::NoCheck,
1593 )
1594 .await
1595 .boxed()
1596 .execute();
1597 materialize_executor.next().await.transpose().unwrap();
1598
1599 materialize_executor.next().await.transpose().unwrap();
1600
1601 match materialize_executor.next().await.transpose().unwrap() {
1603 Some(Message::Barrier(_)) => {
1604 let row = table
1605 .get_row(
1606 &OwnedRow::new(vec![Some(3_i32.into())]),
1607 HummockReadEpoch::NoWait(u64::MAX),
1608 )
1609 .await
1610 .unwrap();
1611 assert_eq!(
1612 row,
1613 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1614 );
1615 }
1616 _ => unreachable!(),
1617 }
1618 materialize_executor.next().await.transpose().unwrap();
1619 match materialize_executor.next().await.transpose().unwrap() {
1621 Some(Message::Barrier(_)) => {
1622 let row = table
1623 .get_row(
1624 &OwnedRow::new(vec![Some(7_i32.into())]),
1625 HummockReadEpoch::NoWait(u64::MAX),
1626 )
1627 .await
1628 .unwrap();
1629 assert_eq!(
1630 row,
1631 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1632 );
1633 }
1634 _ => unreachable!(),
1635 }
1636 }
1637
1638 #[tokio::test]
1640 async fn test_upsert_stream() {
1641 let memory_state_store = MemoryStateStore::new();
1643 let table_id = TableId::new(1);
1644 let schema = Schema::new(vec![
1646 Field::unnamed(DataType::Int32),
1647 Field::unnamed(DataType::Int32),
1648 ]);
1649 let column_ids = vec![0.into(), 1.into()];
1650
1651 let chunk1 = StreamChunk::from_pretty(
1653 " i i
1654 + 1 1",
1655 );
1656
1657 let chunk2 = StreamChunk::from_pretty(
1658 " i i
1659 + 1 2
1660 - 1 2",
1661 );
1662
1663 let source = MockSource::with_messages(vec![
1665 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1666 Message::Chunk(chunk1),
1667 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1668 Message::Chunk(chunk2),
1669 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1670 ])
1671 .into_executor(schema.clone(), PkIndices::new());
1672
1673 let order_types = vec![OrderType::ascending()];
1674 let column_descs = vec![
1675 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1676 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1677 ];
1678
1679 let table = BatchTable::for_test(
1680 memory_state_store.clone(),
1681 table_id,
1682 column_descs,
1683 order_types,
1684 vec![0],
1685 vec![0, 1],
1686 );
1687
1688 let mut materialize_executor = MaterializeExecutor::for_test(
1689 source,
1690 memory_state_store,
1691 table_id,
1692 vec![ColumnOrder::new(0, OrderType::ascending())],
1693 column_ids,
1694 Arc::new(AtomicU64::new(0)),
1695 ConflictBehavior::Overwrite,
1696 )
1697 .await
1698 .boxed()
1699 .execute();
1700 materialize_executor.next().await.transpose().unwrap();
1701
1702 materialize_executor.next().await.transpose().unwrap();
1703 materialize_executor.next().await.transpose().unwrap();
1704 materialize_executor.next().await.transpose().unwrap();
1705
1706 match materialize_executor.next().await.transpose().unwrap() {
1707 Some(Message::Barrier(_)) => {
1708 let row = table
1709 .get_row(
1710 &OwnedRow::new(vec![Some(1_i32.into())]),
1711 HummockReadEpoch::NoWait(u64::MAX),
1712 )
1713 .await
1714 .unwrap();
1715 assert!(row.is_none());
1716 }
1717 _ => unreachable!(),
1718 }
1719 }
1720
1721 #[tokio::test]
1722 async fn test_check_insert_conflict() {
1723 let memory_state_store = MemoryStateStore::new();
1725 let table_id = TableId::new(1);
1726 let schema = Schema::new(vec![
1728 Field::unnamed(DataType::Int32),
1729 Field::unnamed(DataType::Int32),
1730 ]);
1731 let column_ids = vec![0.into(), 1.into()];
1732
1733 let chunk1 = StreamChunk::from_pretty(
1735 " i i
1736 + 1 3
1737 + 1 4
1738 + 2 5
1739 + 3 6",
1740 );
1741
1742 let chunk2 = StreamChunk::from_pretty(
1743 " i i
1744 + 1 3
1745 + 2 6",
1746 );
1747
1748 let chunk3 = StreamChunk::from_pretty(
1750 " i i
1751 + 1 4",
1752 );
1753
1754 let source = MockSource::with_messages(vec![
1756 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1757 Message::Chunk(chunk1),
1758 Message::Chunk(chunk2),
1759 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1760 Message::Chunk(chunk3),
1761 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1762 ])
1763 .into_executor(schema.clone(), PkIndices::new());
1764
1765 let order_types = vec![OrderType::ascending()];
1766 let column_descs = vec![
1767 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1768 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1769 ];
1770
1771 let table = BatchTable::for_test(
1772 memory_state_store.clone(),
1773 table_id,
1774 column_descs,
1775 order_types,
1776 vec![0],
1777 vec![0, 1],
1778 );
1779
1780 let mut materialize_executor = MaterializeExecutor::for_test(
1781 source,
1782 memory_state_store,
1783 table_id,
1784 vec![ColumnOrder::new(0, OrderType::ascending())],
1785 column_ids,
1786 Arc::new(AtomicU64::new(0)),
1787 ConflictBehavior::Overwrite,
1788 )
1789 .await
1790 .boxed()
1791 .execute();
1792 materialize_executor.next().await.transpose().unwrap();
1793
1794 materialize_executor.next().await.transpose().unwrap();
1795 materialize_executor.next().await.transpose().unwrap();
1796
1797 match materialize_executor.next().await.transpose().unwrap() {
1799 Some(Message::Barrier(_)) => {
1800 let row = table
1801 .get_row(
1802 &OwnedRow::new(vec![Some(3_i32.into())]),
1803 HummockReadEpoch::NoWait(u64::MAX),
1804 )
1805 .await
1806 .unwrap();
1807 assert_eq!(
1808 row,
1809 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1810 );
1811
1812 let row = table
1813 .get_row(
1814 &OwnedRow::new(vec![Some(1_i32.into())]),
1815 HummockReadEpoch::NoWait(u64::MAX),
1816 )
1817 .await
1818 .unwrap();
1819 assert_eq!(
1820 row,
1821 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1822 );
1823
1824 let row = table
1825 .get_row(
1826 &OwnedRow::new(vec![Some(2_i32.into())]),
1827 HummockReadEpoch::NoWait(u64::MAX),
1828 )
1829 .await
1830 .unwrap();
1831 assert_eq!(
1832 row,
1833 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1834 );
1835 }
1836 _ => unreachable!(),
1837 }
1838 }
1839
1840 #[tokio::test]
1841 async fn test_delete_and_update_conflict() {
1842 let memory_state_store = MemoryStateStore::new();
1844 let table_id = TableId::new(1);
1845 let schema = Schema::new(vec![
1847 Field::unnamed(DataType::Int32),
1848 Field::unnamed(DataType::Int32),
1849 ]);
1850 let column_ids = vec![0.into(), 1.into()];
1851
1852 let chunk1 = StreamChunk::from_pretty(
1854 " i i
1855 + 1 4
1856 + 2 5
1857 + 3 6
1858 U- 8 1
1859 U+ 8 2
1860 + 8 3",
1861 );
1862
1863 let chunk2 = StreamChunk::from_pretty(
1865 " i i
1866 + 7 8
1867 - 3 4
1868 - 5 0",
1869 );
1870
1871 let chunk3 = StreamChunk::from_pretty(
1873 " i i
1874 + 1 5
1875 U- 2 4
1876 U+ 2 8
1877 U- 9 0
1878 U+ 9 1",
1879 );
1880
1881 let source = MockSource::with_messages(vec![
1883 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1884 Message::Chunk(chunk1),
1885 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1886 Message::Chunk(chunk2),
1887 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1888 Message::Chunk(chunk3),
1889 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1890 ])
1891 .into_executor(schema.clone(), PkIndices::new());
1892
1893 let order_types = vec![OrderType::ascending()];
1894 let column_descs = vec![
1895 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1896 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1897 ];
1898
1899 let table = BatchTable::for_test(
1900 memory_state_store.clone(),
1901 table_id,
1902 column_descs,
1903 order_types,
1904 vec![0],
1905 vec![0, 1],
1906 );
1907
1908 let mut materialize_executor = MaterializeExecutor::for_test(
1909 source,
1910 memory_state_store,
1911 table_id,
1912 vec![ColumnOrder::new(0, OrderType::ascending())],
1913 column_ids,
1914 Arc::new(AtomicU64::new(0)),
1915 ConflictBehavior::Overwrite,
1916 )
1917 .await
1918 .boxed()
1919 .execute();
1920 materialize_executor.next().await.transpose().unwrap();
1921
1922 materialize_executor.next().await.transpose().unwrap();
1923
1924 match materialize_executor.next().await.transpose().unwrap() {
1926 Some(Message::Barrier(_)) => {
1927 let row = table
1929 .get_row(
1930 &OwnedRow::new(vec![Some(8_i32.into())]),
1931 HummockReadEpoch::NoWait(u64::MAX),
1932 )
1933 .await
1934 .unwrap();
1935 assert_eq!(
1936 row,
1937 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1938 );
1939 }
1940 _ => unreachable!(),
1941 }
1942 materialize_executor.next().await.transpose().unwrap();
1943
1944 match materialize_executor.next().await.transpose().unwrap() {
1945 Some(Message::Barrier(_)) => {
1946 let row = table
1947 .get_row(
1948 &OwnedRow::new(vec![Some(7_i32.into())]),
1949 HummockReadEpoch::NoWait(u64::MAX),
1950 )
1951 .await
1952 .unwrap();
1953 assert_eq!(
1954 row,
1955 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1956 );
1957
1958 let row = table
1960 .get_row(
1961 &OwnedRow::new(vec![Some(3_i32.into())]),
1962 HummockReadEpoch::NoWait(u64::MAX),
1963 )
1964 .await
1965 .unwrap();
1966 assert_eq!(row, None);
1967
1968 let row = table
1970 .get_row(
1971 &OwnedRow::new(vec![Some(5_i32.into())]),
1972 HummockReadEpoch::NoWait(u64::MAX),
1973 )
1974 .await
1975 .unwrap();
1976 assert_eq!(row, None);
1977 }
1978 _ => unreachable!(),
1979 }
1980
1981 materialize_executor.next().await.transpose().unwrap();
1982 match materialize_executor.next().await.transpose().unwrap() {
1984 Some(Message::Barrier(_)) => {
1985 let row = table
1986 .get_row(
1987 &OwnedRow::new(vec![Some(1_i32.into())]),
1988 HummockReadEpoch::NoWait(u64::MAX),
1989 )
1990 .await
1991 .unwrap();
1992 assert_eq!(
1993 row,
1994 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1995 );
1996
1997 let row = table
1999 .get_row(
2000 &OwnedRow::new(vec![Some(2_i32.into())]),
2001 HummockReadEpoch::NoWait(u64::MAX),
2002 )
2003 .await
2004 .unwrap();
2005 assert_eq!(
2006 row,
2007 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2008 );
2009
2010 let row = table
2012 .get_row(
2013 &OwnedRow::new(vec![Some(9_i32.into())]),
2014 HummockReadEpoch::NoWait(u64::MAX),
2015 )
2016 .await
2017 .unwrap();
2018 assert_eq!(
2019 row,
2020 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2021 );
2022 }
2023 _ => unreachable!(),
2024 }
2025 }
2026
2027 #[tokio::test]
2028 async fn test_ignore_insert_conflict() {
2029 let memory_state_store = MemoryStateStore::new();
2031 let table_id = TableId::new(1);
2032 let schema = Schema::new(vec![
2034 Field::unnamed(DataType::Int32),
2035 Field::unnamed(DataType::Int32),
2036 ]);
2037 let column_ids = vec![0.into(), 1.into()];
2038
2039 let chunk1 = StreamChunk::from_pretty(
2041 " i i
2042 + 1 3
2043 + 1 4
2044 + 2 5
2045 + 3 6",
2046 );
2047
2048 let chunk2 = StreamChunk::from_pretty(
2049 " i i
2050 + 1 5
2051 + 2 6",
2052 );
2053
2054 let chunk3 = StreamChunk::from_pretty(
2056 " i i
2057 + 1 6",
2058 );
2059
2060 let source = MockSource::with_messages(vec![
2062 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2063 Message::Chunk(chunk1),
2064 Message::Chunk(chunk2),
2065 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2066 Message::Chunk(chunk3),
2067 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2068 ])
2069 .into_executor(schema.clone(), PkIndices::new());
2070
2071 let order_types = vec![OrderType::ascending()];
2072 let column_descs = vec![
2073 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2074 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2075 ];
2076
2077 let table = BatchTable::for_test(
2078 memory_state_store.clone(),
2079 table_id,
2080 column_descs,
2081 order_types,
2082 vec![0],
2083 vec![0, 1],
2084 );
2085
2086 let mut materialize_executor = MaterializeExecutor::for_test(
2087 source,
2088 memory_state_store,
2089 table_id,
2090 vec![ColumnOrder::new(0, OrderType::ascending())],
2091 column_ids,
2092 Arc::new(AtomicU64::new(0)),
2093 ConflictBehavior::IgnoreConflict,
2094 )
2095 .await
2096 .boxed()
2097 .execute();
2098 materialize_executor.next().await.transpose().unwrap();
2099
2100 materialize_executor.next().await.transpose().unwrap();
2101 materialize_executor.next().await.transpose().unwrap();
2102
2103 match materialize_executor.next().await.transpose().unwrap() {
2105 Some(Message::Barrier(_)) => {
2106 let row = table
2107 .get_row(
2108 &OwnedRow::new(vec![Some(3_i32.into())]),
2109 HummockReadEpoch::NoWait(u64::MAX),
2110 )
2111 .await
2112 .unwrap();
2113 assert_eq!(
2114 row,
2115 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2116 );
2117
2118 let row = table
2119 .get_row(
2120 &OwnedRow::new(vec![Some(1_i32.into())]),
2121 HummockReadEpoch::NoWait(u64::MAX),
2122 )
2123 .await
2124 .unwrap();
2125 assert_eq!(
2126 row,
2127 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2128 );
2129
2130 let row = table
2131 .get_row(
2132 &OwnedRow::new(vec![Some(2_i32.into())]),
2133 HummockReadEpoch::NoWait(u64::MAX),
2134 )
2135 .await
2136 .unwrap();
2137 assert_eq!(
2138 row,
2139 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2140 );
2141 }
2142 _ => unreachable!(),
2143 }
2144 }
2145
2146 #[tokio::test]
2147 async fn test_ignore_delete_then_insert() {
2148 let memory_state_store = MemoryStateStore::new();
2150 let table_id = TableId::new(1);
2151 let schema = Schema::new(vec![
2153 Field::unnamed(DataType::Int32),
2154 Field::unnamed(DataType::Int32),
2155 ]);
2156 let column_ids = vec![0.into(), 1.into()];
2157
2158 let chunk1 = StreamChunk::from_pretty(
2160 " i i
2161 + 1 3
2162 - 1 3
2163 + 1 6",
2164 );
2165
2166 let source = MockSource::with_messages(vec![
2168 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2169 Message::Chunk(chunk1),
2170 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2171 ])
2172 .into_executor(schema.clone(), PkIndices::new());
2173
2174 let order_types = vec![OrderType::ascending()];
2175 let column_descs = vec![
2176 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2177 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2178 ];
2179
2180 let table = BatchTable::for_test(
2181 memory_state_store.clone(),
2182 table_id,
2183 column_descs,
2184 order_types,
2185 vec![0],
2186 vec![0, 1],
2187 );
2188
2189 let mut materialize_executor = MaterializeExecutor::for_test(
2190 source,
2191 memory_state_store,
2192 table_id,
2193 vec![ColumnOrder::new(0, OrderType::ascending())],
2194 column_ids,
2195 Arc::new(AtomicU64::new(0)),
2196 ConflictBehavior::IgnoreConflict,
2197 )
2198 .await
2199 .boxed()
2200 .execute();
2201 let _msg1 = materialize_executor
2202 .next()
2203 .await
2204 .transpose()
2205 .unwrap()
2206 .unwrap()
2207 .as_barrier()
2208 .unwrap();
2209 let _msg2 = materialize_executor
2210 .next()
2211 .await
2212 .transpose()
2213 .unwrap()
2214 .unwrap()
2215 .as_chunk()
2216 .unwrap();
2217 let _msg3 = materialize_executor
2218 .next()
2219 .await
2220 .transpose()
2221 .unwrap()
2222 .unwrap()
2223 .as_barrier()
2224 .unwrap();
2225
2226 let row = table
2227 .get_row(
2228 &OwnedRow::new(vec![Some(1_i32.into())]),
2229 HummockReadEpoch::NoWait(u64::MAX),
2230 )
2231 .await
2232 .unwrap();
2233 assert_eq!(
2234 row,
2235 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2236 );
2237 }
2238
2239 #[tokio::test]
2240 async fn test_ignore_delete_and_update_conflict() {
2241 let memory_state_store = MemoryStateStore::new();
2243 let table_id = TableId::new(1);
2244 let schema = Schema::new(vec![
2246 Field::unnamed(DataType::Int32),
2247 Field::unnamed(DataType::Int32),
2248 ]);
2249 let column_ids = vec![0.into(), 1.into()];
2250
2251 let chunk1 = StreamChunk::from_pretty(
2253 " i i
2254 + 1 4
2255 + 2 5
2256 + 3 6
2257 U- 8 1
2258 U+ 8 2
2259 + 8 3",
2260 );
2261
2262 let chunk2 = StreamChunk::from_pretty(
2264 " i i
2265 + 7 8
2266 - 3 4
2267 - 5 0",
2268 );
2269
2270 let chunk3 = StreamChunk::from_pretty(
2272 " i i
2273 + 1 5
2274 U- 2 4
2275 U+ 2 8
2276 U- 9 0
2277 U+ 9 1",
2278 );
2279
2280 let source = MockSource::with_messages(vec![
2282 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2283 Message::Chunk(chunk1),
2284 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2285 Message::Chunk(chunk2),
2286 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2287 Message::Chunk(chunk3),
2288 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2289 ])
2290 .into_executor(schema.clone(), PkIndices::new());
2291
2292 let order_types = vec![OrderType::ascending()];
2293 let column_descs = vec![
2294 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2295 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2296 ];
2297
2298 let table = BatchTable::for_test(
2299 memory_state_store.clone(),
2300 table_id,
2301 column_descs,
2302 order_types,
2303 vec![0],
2304 vec![0, 1],
2305 );
2306
2307 let mut materialize_executor = MaterializeExecutor::for_test(
2308 source,
2309 memory_state_store,
2310 table_id,
2311 vec![ColumnOrder::new(0, OrderType::ascending())],
2312 column_ids,
2313 Arc::new(AtomicU64::new(0)),
2314 ConflictBehavior::IgnoreConflict,
2315 )
2316 .await
2317 .boxed()
2318 .execute();
2319 materialize_executor.next().await.transpose().unwrap();
2320
2321 materialize_executor.next().await.transpose().unwrap();
2322
2323 match materialize_executor.next().await.transpose().unwrap() {
2325 Some(Message::Barrier(_)) => {
2326 let row = table
2328 .get_row(
2329 &OwnedRow::new(vec![Some(8_i32.into())]),
2330 HummockReadEpoch::NoWait(u64::MAX),
2331 )
2332 .await
2333 .unwrap();
2334 assert_eq!(
2335 row,
2336 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2337 );
2338 }
2339 _ => unreachable!(),
2340 }
2341 materialize_executor.next().await.transpose().unwrap();
2342
2343 match materialize_executor.next().await.transpose().unwrap() {
2344 Some(Message::Barrier(_)) => {
2345 let row = table
2346 .get_row(
2347 &OwnedRow::new(vec![Some(7_i32.into())]),
2348 HummockReadEpoch::NoWait(u64::MAX),
2349 )
2350 .await
2351 .unwrap();
2352 assert_eq!(
2353 row,
2354 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2355 );
2356
2357 let row = table
2359 .get_row(
2360 &OwnedRow::new(vec![Some(3_i32.into())]),
2361 HummockReadEpoch::NoWait(u64::MAX),
2362 )
2363 .await
2364 .unwrap();
2365 assert_eq!(row, None);
2366
2367 let row = table
2369 .get_row(
2370 &OwnedRow::new(vec![Some(5_i32.into())]),
2371 HummockReadEpoch::NoWait(u64::MAX),
2372 )
2373 .await
2374 .unwrap();
2375 assert_eq!(row, None);
2376 }
2377 _ => unreachable!(),
2378 }
2379
2380 materialize_executor.next().await.transpose().unwrap();
2381 match materialize_executor.next().await.transpose().unwrap() {
2384 Some(Message::Barrier(_)) => {
2385 let row = table
2386 .get_row(
2387 &OwnedRow::new(vec![Some(1_i32.into())]),
2388 HummockReadEpoch::NoWait(u64::MAX),
2389 )
2390 .await
2391 .unwrap();
2392 assert_eq!(
2393 row,
2394 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2395 );
2396
2397 let row = table
2399 .get_row(
2400 &OwnedRow::new(vec![Some(2_i32.into())]),
2401 HummockReadEpoch::NoWait(u64::MAX),
2402 )
2403 .await
2404 .unwrap();
2405 assert_eq!(
2406 row,
2407 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2408 );
2409
2410 let row = table
2412 .get_row(
2413 &OwnedRow::new(vec![Some(9_i32.into())]),
2414 HummockReadEpoch::NoWait(u64::MAX),
2415 )
2416 .await
2417 .unwrap();
2418 assert_eq!(
2419 row,
2420 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2421 );
2422 }
2423 _ => unreachable!(),
2424 }
2425 }
2426
2427 #[tokio::test]
2428 async fn test_do_update_if_not_null_conflict() {
2429 let memory_state_store = MemoryStateStore::new();
2431 let table_id = TableId::new(1);
2432 let schema = Schema::new(vec![
2434 Field::unnamed(DataType::Int32),
2435 Field::unnamed(DataType::Int32),
2436 ]);
2437 let column_ids = vec![0.into(), 1.into()];
2438
2439 let chunk1 = StreamChunk::from_pretty(
2441 " i i
2442 + 1 4
2443 + 2 .
2444 + 3 6
2445 U- 8 .
2446 U+ 8 2
2447 + 8 .",
2448 );
2449
2450 let chunk2 = StreamChunk::from_pretty(
2452 " i i
2453 + 7 8
2454 - 3 4
2455 - 5 0",
2456 );
2457
2458 let chunk3 = StreamChunk::from_pretty(
2460 " i i
2461 + 1 5
2462 + 7 .
2463 U- 2 4
2464 U+ 2 .
2465 U- 9 0
2466 U+ 9 1",
2467 );
2468
2469 let source = MockSource::with_messages(vec![
2471 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2472 Message::Chunk(chunk1),
2473 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2474 Message::Chunk(chunk2),
2475 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2476 Message::Chunk(chunk3),
2477 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2478 ])
2479 .into_executor(schema.clone(), PkIndices::new());
2480
2481 let order_types = vec![OrderType::ascending()];
2482 let column_descs = vec![
2483 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2484 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2485 ];
2486
2487 let table = BatchTable::for_test(
2488 memory_state_store.clone(),
2489 table_id,
2490 column_descs,
2491 order_types,
2492 vec![0],
2493 vec![0, 1],
2494 );
2495
2496 let mut materialize_executor = MaterializeExecutor::for_test(
2497 source,
2498 memory_state_store,
2499 table_id,
2500 vec![ColumnOrder::new(0, OrderType::ascending())],
2501 column_ids,
2502 Arc::new(AtomicU64::new(0)),
2503 ConflictBehavior::DoUpdateIfNotNull,
2504 )
2505 .await
2506 .boxed()
2507 .execute();
2508 materialize_executor.next().await.transpose().unwrap();
2509
2510 materialize_executor.next().await.transpose().unwrap();
2511
2512 match materialize_executor.next().await.transpose().unwrap() {
2514 Some(Message::Barrier(_)) => {
2515 let row = table
2516 .get_row(
2517 &OwnedRow::new(vec![Some(8_i32.into())]),
2518 HummockReadEpoch::NoWait(u64::MAX),
2519 )
2520 .await
2521 .unwrap();
2522 assert_eq!(
2523 row,
2524 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2525 );
2526
2527 let row = table
2528 .get_row(
2529 &OwnedRow::new(vec![Some(2_i32.into())]),
2530 HummockReadEpoch::NoWait(u64::MAX),
2531 )
2532 .await
2533 .unwrap();
2534 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2535 }
2536 _ => unreachable!(),
2537 }
2538 materialize_executor.next().await.transpose().unwrap();
2539
2540 match materialize_executor.next().await.transpose().unwrap() {
2541 Some(Message::Barrier(_)) => {
2542 let row = table
2543 .get_row(
2544 &OwnedRow::new(vec![Some(7_i32.into())]),
2545 HummockReadEpoch::NoWait(u64::MAX),
2546 )
2547 .await
2548 .unwrap();
2549 assert_eq!(
2550 row,
2551 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2552 );
2553
2554 let row = table
2556 .get_row(
2557 &OwnedRow::new(vec![Some(3_i32.into())]),
2558 HummockReadEpoch::NoWait(u64::MAX),
2559 )
2560 .await
2561 .unwrap();
2562 assert_eq!(row, None);
2563
2564 let row = table
2566 .get_row(
2567 &OwnedRow::new(vec![Some(5_i32.into())]),
2568 HummockReadEpoch::NoWait(u64::MAX),
2569 )
2570 .await
2571 .unwrap();
2572 assert_eq!(row, None);
2573 }
2574 _ => unreachable!(),
2575 }
2576
2577 materialize_executor.next().await.transpose().unwrap();
2578 match materialize_executor.next().await.transpose().unwrap() {
2581 Some(Message::Barrier(_)) => {
2582 let row = table
2583 .get_row(
2584 &OwnedRow::new(vec![Some(7_i32.into())]),
2585 HummockReadEpoch::NoWait(u64::MAX),
2586 )
2587 .await
2588 .unwrap();
2589 assert_eq!(
2590 row,
2591 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2592 );
2593
2594 let row = table
2596 .get_row(
2597 &OwnedRow::new(vec![Some(2_i32.into())]),
2598 HummockReadEpoch::NoWait(u64::MAX),
2599 )
2600 .await
2601 .unwrap();
2602 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2603
2604 let row = table
2606 .get_row(
2607 &OwnedRow::new(vec![Some(9_i32.into())]),
2608 HummockReadEpoch::NoWait(u64::MAX),
2609 )
2610 .await
2611 .unwrap();
2612 assert_eq!(
2613 row,
2614 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2615 );
2616 }
2617 _ => unreachable!(),
2618 }
2619 }
2620
2621 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2622 const KN: u32 = 4;
2623 const SEED: u64 = 998244353;
2624 let mut ret = vec![];
2625 let mut builder =
2626 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2627 let mut rng = SmallRng::seed_from_u64(SEED);
2628
2629 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2630 let len = c.data_chunk().capacity();
2631 let mut c = StreamChunkMut::from(c);
2632 for i in 0..len {
2633 c.set_vis(i, rng.random_bool(0.5));
2634 }
2635 c.into()
2636 };
2637 for _ in 0..row_number {
2638 let k = (rng.next_u32() % KN) as i32;
2639 let v = rng.next_u32() as i32;
2640 let op = if rng.random_bool(0.5) {
2641 Op::Insert
2642 } else {
2643 Op::Delete
2644 };
2645 if let Some(c) =
2646 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2647 {
2648 ret.push(random_vis(c, &mut rng));
2649 }
2650 }
2651 if let Some(c) = builder.take() {
2652 ret.push(random_vis(c, &mut rng));
2653 }
2654 ret
2655 }
2656
2657 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2658 const N: usize = 100000;
2659
2660 let memory_state_store = MemoryStateStore::new();
2662 let table_id = TableId::new(1);
2663 let schema = Schema::new(vec![
2665 Field::unnamed(DataType::Int32),
2666 Field::unnamed(DataType::Int32),
2667 ]);
2668 let column_ids = vec![0.into(), 1.into()];
2669
2670 let chunks = gen_fuzz_data(N, 128);
2671 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2672 .chain(chunks.into_iter().map(Message::Chunk))
2673 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2674 test_epoch(2),
2675 ))))
2676 .collect();
2677 let source =
2679 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2680
2681 let mut materialize_executor = MaterializeExecutor::for_test(
2682 source,
2683 memory_state_store.clone(),
2684 table_id,
2685 vec![ColumnOrder::new(0, OrderType::ascending())],
2686 column_ids,
2687 Arc::new(AtomicU64::new(0)),
2688 conflict_behavior,
2689 )
2690 .await
2691 .boxed()
2692 .execute();
2693 materialize_executor.expect_barrier().await;
2694
2695 let order_types = vec![OrderType::ascending()];
2696 let column_descs = vec![
2697 ColumnDesc::unnamed(0.into(), DataType::Int32),
2698 ColumnDesc::unnamed(1.into(), DataType::Int32),
2699 ];
2700 let pk_indices = vec![0];
2701
2702 let mut table = StateTable::from_table_catalog(
2703 &crate::common::table::test_utils::gen_pbtable(
2704 TableId::from(1002),
2705 column_descs.clone(),
2706 order_types,
2707 pk_indices,
2708 0,
2709 ),
2710 memory_state_store.clone(),
2711 None,
2712 )
2713 .await;
2714
2715 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2716 table.write_chunk(c);
2718 }
2719 }
2720
2721 #[tokio::test]
2722 async fn fuzz_test_stream_consistent_upsert() {
2723 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2724 }
2725
2726 #[tokio::test]
2727 async fn fuzz_test_stream_consistent_ignore() {
2728 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2729 }
2730}