1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Bound, Deref, Index};
20
21use bytes::Bytes;
22use futures::future::Either;
23use futures::stream::{self, select_with_strategy};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use risingwave_common::array::Op;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{
29 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
30};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
33use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
34use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
35use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
36use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
37use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
38use risingwave_hummock_sdk::HummockReadEpoch;
39use risingwave_pb::catalog::Table;
40use risingwave_pb::catalog::table::{Engine, OptionalAssociatedSourceId};
41use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
42use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
43use risingwave_storage::table::KeyedRow;
44
45use crate::cache::ManagedLruCache;
46use crate::common::metrics::MetricsInfo;
47use crate::common::table::state_table::{
48 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
49};
50use crate::executor::error::ErrorKind;
51use crate::executor::monitor::MaterializeMetrics;
52use crate::executor::mview::RefreshProgressTable;
53use crate::executor::prelude::*;
54use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
55use crate::task::LocalBarrierManager;
56
57#[derive(Debug, Clone)]
58pub enum MaterializeStreamState<M> {
59 NormalIngestion,
60 MergingData,
61 CleanUp,
62 CommitAndYieldBarrier {
63 barrier: BarrierInner<M>,
64 expect_next_state: Box<MaterializeStreamState<M>>,
65 },
66 RefreshEnd {
67 on_complete_epoch: EpochPair,
68 },
69}
70
71pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
73 input: Executor,
74
75 schema: Schema,
76
77 state_table: StateTableInner<S, SD>,
78
79 arrange_key_indices: Vec<usize>,
81
82 actor_context: ActorContextRef,
83
84 materialize_cache: MaterializeCache<SD>,
85
86 conflict_behavior: ConflictBehavior,
87
88 version_column_indices: Vec<u32>,
89
90 may_have_downstream: bool,
91
92 depended_subscription_ids: HashSet<u32>,
93
94 metrics: MaterializeMetrics,
95
96 is_dummy_table: bool,
99
100 toastable_column_indices: Option<Vec<usize>>,
102
103 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
105
106 local_barrier_manager: LocalBarrierManager,
108}
109
110pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
112 pub table_catalog: Table,
114
115 pub staging_table_catalog: Table,
117
118 pub is_refreshing: bool,
120
121 pub staging_table: StateTableInner<S, SD>,
128
129 pub progress_table: RefreshProgressTable<S>,
131
132 pub table_id: TableId,
134}
135
136impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
137 pub async fn new(
139 store: S,
140 table_catalog: &Table,
141 staging_table_catalog: &Table,
142 progress_state_table: &Table,
143 vnodes: Option<Arc<Bitmap>>,
144 ) -> Self {
145 let table_id = TableId::new(table_catalog.id);
146
147 let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
149 staging_table_catalog,
150 store.clone(),
151 vnodes.clone(),
152 )
153 .await;
154
155 let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
156 progress_state_table,
157 store,
158 vnodes,
159 )
160 .await;
161
162 let pk_len = table_catalog.pk.len();
164 let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
165
166 debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
167
168 Self {
169 table_catalog: table_catalog.clone(),
170 staging_table_catalog: staging_table_catalog.clone(),
171 is_refreshing: false,
172 staging_table,
173 progress_table,
174 table_id,
175 }
176 }
177}
178
179fn get_op_consistency_level(
180 conflict_behavior: ConflictBehavior,
181 may_have_downstream: bool,
182 depended_subscriptions: &HashSet<u32>,
183) -> StateTableOpConsistencyLevel {
184 if !depended_subscriptions.is_empty() {
185 StateTableOpConsistencyLevel::LogStoreEnabled
186 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
187 StateTableOpConsistencyLevel::Inconsistent
190 } else {
191 StateTableOpConsistencyLevel::ConsistentOldValue
192 }
193}
194
195impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
196 #[allow(clippy::too_many_arguments)]
200 pub async fn new(
201 input: Executor,
202 schema: Schema,
203 store: S,
204 arrange_key: Vec<ColumnOrder>,
205 actor_context: ActorContextRef,
206 vnodes: Option<Arc<Bitmap>>,
207 table_catalog: &Table,
208 watermark_epoch: AtomicU64Ref,
209 conflict_behavior: ConflictBehavior,
210 version_column_indices: Vec<u32>,
211 metrics: Arc<StreamingMetrics>,
212 refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
213 local_barrier_manager: LocalBarrierManager,
214 ) -> Self {
215 let table_columns: Vec<ColumnDesc> = table_catalog
216 .columns
217 .iter()
218 .map(|col| col.column_desc.as_ref().unwrap().into())
219 .collect();
220
221 let toastable_column_indices = if table_catalog.cdc_table_type()
224 == risingwave_pb::catalog::table::CdcTableType::Postgres
225 {
226 let toastable_indices: Vec<usize> = table_columns
227 .iter()
228 .enumerate()
229 .filter_map(|(index, column)| match &column.data_type {
230 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
239 Some(index)
240 }
241 _ => None,
242 })
243 .collect();
244
245 if toastable_indices.is_empty() {
246 None
247 } else {
248 Some(toastable_indices)
249 }
250 } else {
251 None
252 };
253
254 let row_serde: BasicSerde = BasicSerde::new(
255 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
256 Arc::from(table_columns.into_boxed_slice()),
257 );
258
259 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
260 let may_have_downstream = actor_context.initial_dispatch_num != 0;
261 let depended_subscription_ids = actor_context
262 .related_subscriptions
263 .get(&TableId::new(table_catalog.id))
264 .cloned()
265 .unwrap_or_default();
266 let op_consistency_level = get_op_consistency_level(
267 conflict_behavior,
268 may_have_downstream,
269 &depended_subscription_ids,
270 );
271 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
273 .with_op_consistency_level(op_consistency_level)
274 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
275 .build()
276 .await;
277
278 let mv_metrics = metrics.new_materialize_metrics(
279 TableId::new(table_catalog.id),
280 actor_context.id,
281 actor_context.fragment_id,
282 );
283
284 let metrics_info =
285 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
286
287 let is_dummy_table =
288 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
289
290 Self {
291 input,
292 schema,
293 state_table,
294 arrange_key_indices,
295 actor_context,
296 materialize_cache: MaterializeCache::new(
297 watermark_epoch,
298 metrics_info,
299 row_serde,
300 version_column_indices.clone(),
301 ),
302 conflict_behavior,
303 version_column_indices,
304 is_dummy_table,
305 may_have_downstream,
306 depended_subscription_ids,
307 metrics: mv_metrics,
308 toastable_column_indices,
309 refresh_args,
310 local_barrier_manager,
311 }
312 }
313
314 #[try_stream(ok = Message, error = StreamExecutorError)]
315 async fn execute_inner(mut self) {
316 let mv_table_id = TableId::new(self.state_table.table_id());
317 let _staging_table_id = TableId::new(self.state_table.table_id());
318 let data_types = self.schema.data_types();
319 let mut input = self.input.execute();
320
321 let barrier = expect_first_barrier(&mut input).await?;
322 let first_epoch = barrier.epoch;
323 let _barrier_epoch = barrier.epoch; yield Message::Barrier(barrier);
326 self.state_table.init_epoch(first_epoch).await?;
327
328 let mut inner_state =
330 Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
331 if let Some(ref mut refresh_args) = self.refresh_args {
333 refresh_args.staging_table.init_epoch(first_epoch).await?;
334
335 refresh_args.progress_table.recover(first_epoch).await?;
337
338 let progress_stats = refresh_args.progress_table.get_progress_stats();
340 if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
341 refresh_args.is_refreshing = true;
342 tracing::info!(
343 total_vnodes = progress_stats.total_vnodes,
344 completed_vnodes = progress_stats.completed_vnodes,
345 "Recovered refresh in progress, resuming refresh operation"
346 );
347
348 let incomplete_vnodes: Vec<_> = refresh_args
352 .progress_table
353 .get_all_progress()
354 .iter()
355 .filter(|(_, entry)| !entry.is_completed)
356 .map(|(&vnode, _)| vnode)
357 .collect();
358
359 if !incomplete_vnodes.is_empty() {
360 tracing::info!(
362 incomplete_vnodes = incomplete_vnodes.len(),
363 "Recovery detected incomplete VNodes, resuming refresh operation"
364 );
365 } else {
368 tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
370 }
371 }
372 }
373
374 if let Some(ref refresh_args) = self.refresh_args
376 && refresh_args.is_refreshing
377 {
378 let incomplete_vnodes: Vec<_> = refresh_args
380 .progress_table
381 .get_all_progress()
382 .iter()
383 .filter(|(_, entry)| !entry.is_completed)
384 .map(|(&vnode, _)| vnode)
385 .collect();
386 if !incomplete_vnodes.is_empty() {
387 inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
389 tracing::info!(
390 incomplete_vnodes = incomplete_vnodes.len(),
391 "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
392 );
393 }
394 }
395
396 'main_loop: loop {
398 match *inner_state {
399 MaterializeStreamState::NormalIngestion => {
400 #[for_await]
401 '_normal_ingest: for msg in input.by_ref() {
402 let msg = msg?;
403 self.materialize_cache.evict();
404
405 match msg {
406 Message::Watermark(w) => {
407 yield Message::Watermark(w);
408 }
409 Message::Chunk(chunk) if self.is_dummy_table => {
410 self.metrics
411 .materialize_input_row_count
412 .inc_by(chunk.cardinality() as u64);
413 yield Message::Chunk(chunk);
414 }
415 Message::Chunk(chunk) => {
416 self.metrics
417 .materialize_input_row_count
418 .inc_by(chunk.cardinality() as u64);
419 let do_not_handle_conflict = !self.state_table.is_consistent_op()
423 && self.version_column_indices.is_empty()
424 && self.conflict_behavior == ConflictBehavior::Overwrite;
425
426 match self.conflict_behavior {
427 checked_conflict_behaviors!() if !do_not_handle_conflict => {
428 if chunk.cardinality() == 0 {
429 continue;
431 }
432 let (data_chunk, ops) = chunk.clone().into_parts();
433
434 if self.state_table.value_indices().is_some() {
435 panic!(
438 "materialize executor with data check can not handle only materialize partial columns"
439 )
440 };
441 let values = data_chunk.serialize();
442
443 let key_chunk =
444 data_chunk.project(self.state_table.pk_indices());
445
446 if let Some(ref mut refresh_args) = self.refresh_args
449 && refresh_args.is_refreshing
450 {
451 let key_chunk = chunk
452 .clone()
453 .project(self.state_table.pk_indices());
454 tracing::trace!(
455 staging_chunk = %key_chunk.to_pretty(),
456 input_chunk = %chunk.to_pretty(),
457 "writing to staging table"
458 );
459 if cfg!(debug_assertions) {
460 assert!(
462 key_chunk
463 .ops()
464 .iter()
465 .all(|op| op == &Op::Insert)
466 );
467 }
468 refresh_args
469 .staging_table
470 .write_chunk(key_chunk.clone());
471 refresh_args.staging_table.try_flush().await?;
472 }
473
474 let pks = {
475 let mut pks = vec![vec![]; data_chunk.capacity()];
476 key_chunk
477 .rows_with_holes()
478 .zip_eq_fast(pks.iter_mut())
479 .for_each(|(r, vnode_and_pk)| {
480 if let Some(r) = r {
481 self.state_table
482 .pk_serde()
483 .serialize(r, vnode_and_pk);
484 }
485 });
486 pks
487 };
488 let (_, vis) = key_chunk.into_parts();
489 let row_ops = ops
490 .iter()
491 .zip_eq_debug(pks.into_iter())
492 .zip_eq_debug(values.into_iter())
493 .zip_eq_debug(vis.iter())
494 .filter_map(|(((op, k), v), vis)| {
495 vis.then_some((*op, k, v))
496 })
497 .collect_vec();
498
499 let change_buffer = self
500 .materialize_cache
501 .handle(
502 row_ops,
503 &self.state_table,
504 self.conflict_behavior,
505 &self.metrics,
506 self.toastable_column_indices.as_deref(),
507 )
508 .await?;
509
510 match generate_output(change_buffer, data_types.clone())? {
511 Some(output_chunk) => {
512 self.state_table.write_chunk(output_chunk.clone());
513 self.state_table.try_flush().await?;
514 yield Message::Chunk(output_chunk);
515 }
516 None => continue,
517 }
518 }
519 ConflictBehavior::IgnoreConflict => unreachable!(),
520 ConflictBehavior::NoCheck
521 | ConflictBehavior::Overwrite
522 | ConflictBehavior::DoUpdateIfNotNull => {
523 self.state_table.write_chunk(chunk.clone());
524 self.state_table.try_flush().await?;
525
526 if let Some(ref mut refresh_args) = self.refresh_args
528 && refresh_args.is_refreshing
529 {
530 let key_chunk = chunk
531 .clone()
532 .project(self.state_table.pk_indices());
533 tracing::trace!(
534 staging_chunk = %key_chunk.to_pretty(),
535 input_chunk = %chunk.to_pretty(),
536 "writing to staging table"
537 );
538 if cfg!(debug_assertions) {
539 assert!(
541 key_chunk
542 .ops()
543 .iter()
544 .all(|op| op == &Op::Insert)
545 );
546 }
547 refresh_args
548 .staging_table
549 .write_chunk(key_chunk.clone());
550 refresh_args.staging_table.try_flush().await?;
551 }
552
553 yield Message::Chunk(chunk);
554 }
555 }
556 }
557 Message::Barrier(barrier) => {
558 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
559 barrier,
560 expect_next_state: Box::new(
561 MaterializeStreamState::NormalIngestion,
562 ),
563 };
564 continue 'main_loop;
565 }
566 }
567 }
568
569 return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
570 anyhow::anyhow!(
571 "Input stream terminated unexpectedly during normal ingestion"
572 ),
573 )));
574 }
575 MaterializeStreamState::MergingData => {
576 let Some(refresh_args) = self.refresh_args.as_mut() else {
577 panic!(
578 "MaterializeExecutor entered CleanUp state without refresh_args configured"
579 );
580 };
581 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
582
583 debug_assert_eq!(
584 self.state_table.vnodes(),
585 refresh_args.staging_table.vnodes()
586 );
587 debug_assert_eq!(
588 refresh_args.staging_table.vnodes(),
589 refresh_args.progress_table.vnodes()
590 );
591
592 let mut rows_to_delete = vec![];
593 let mut merge_complete = false;
594 let mut pending_barrier: Option<Barrier> = None;
595
596 {
598 let left_input = input.by_ref().map(Either::Left);
599 let right_merge_sort = pin!(
600 Self::make_mergesort_stream(
601 &self.state_table,
602 &refresh_args.staging_table,
603 &mut refresh_args.progress_table
604 )
605 .map(Either::Right)
606 );
607
608 let mut merge_stream =
611 select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
612 stream::PollNext::Left
613 });
614
615 #[for_await]
616 'merge_stream: for either in &mut merge_stream {
617 match either {
618 Either::Left(msg) => {
619 let msg = msg?;
620 match msg {
621 Message::Watermark(w) => yield Message::Watermark(w),
622 Message::Chunk(chunk) => {
623 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
624 }
625 Message::Barrier(b) => {
626 pending_barrier = Some(b);
627 break 'merge_stream;
628 }
629 }
630 }
631 Either::Right(result) => {
632 match result? {
633 Some((_vnode, row)) => {
634 rows_to_delete.push(row);
635 }
636 None => {
637 merge_complete = true;
639
640 }
642 }
643 }
644 }
645 }
646 }
647
648 for row in &rows_to_delete {
650 self.state_table.delete(row);
651 }
652 if !rows_to_delete.is_empty() {
653 let to_delete_chunk = StreamChunk::from_rows(
654 &rows_to_delete
655 .iter()
656 .map(|row| (Op::Delete, row))
657 .collect_vec(),
658 &self.schema.data_types(),
659 );
660 yield Message::Chunk(to_delete_chunk);
661 }
662
663 assert!(pending_barrier.is_some(), "pending barrier is not set");
665
666 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
667 barrier: pending_barrier.unwrap(),
668 expect_next_state: if merge_complete {
669 Box::new(MaterializeStreamState::CleanUp)
670 } else {
671 Box::new(MaterializeStreamState::MergingData)
672 },
673 };
674 continue 'main_loop;
675 }
676 MaterializeStreamState::CleanUp => {
677 let Some(refresh_args) = self.refresh_args.as_mut() else {
678 panic!(
679 "MaterializeExecutor entered MergingData state without refresh_args configured"
680 );
681 };
682 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
683
684 #[for_await]
685 for msg in input.by_ref() {
686 let msg = msg?;
687 match msg {
688 Message::Watermark(w) => yield Message::Watermark(w),
689 Message::Chunk(chunk) => {
690 tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
691 }
692 Message::Barrier(barrier) if !barrier.is_checkpoint() => {
693 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694 barrier,
695 expect_next_state: Box::new(MaterializeStreamState::CleanUp),
696 };
697 continue 'main_loop;
698 }
699 Message::Barrier(barrier) => {
700 let staging_table_id = refresh_args.staging_table.table_id();
701 let epoch = barrier.epoch;
702 self.local_barrier_manager.report_refresh_finished(
703 epoch,
704 self.actor_context.id,
705 refresh_args.table_id.into(),
706 staging_table_id,
707 );
708 tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
709
710 *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
711 barrier,
712 expect_next_state: Box::new(
713 MaterializeStreamState::RefreshEnd {
714 on_complete_epoch: epoch,
715 },
716 ),
717 };
718 continue 'main_loop;
719 }
720 }
721 }
722 }
723 MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
724 let Some(refresh_args) = self.refresh_args.as_mut() else {
725 panic!(
726 "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
727 );
728 };
729 let staging_table_id = refresh_args.staging_table.table_id();
730
731 let staging_store = refresh_args.staging_table.state_store().clone();
733 staging_store
734 .try_wait_epoch(
735 HummockReadEpoch::Committed(on_complete_epoch.prev),
736 TryWaitEpochOptions {
737 table_id: staging_table_id.into(),
738 },
739 )
740 .await?;
741
742 if let Some(ref mut refresh_args) = self.refresh_args {
743 refresh_args.is_refreshing = false;
744 }
745 *inner_state = MaterializeStreamState::NormalIngestion;
746 continue 'main_loop;
747 }
748 MaterializeStreamState::CommitAndYieldBarrier {
749 barrier,
750 mut expect_next_state,
751 } => {
752 if let Some(ref mut refresh_args) = self.refresh_args {
753 match barrier.mutation.as_deref() {
754 Some(Mutation::RefreshStart {
755 table_id: refresh_table_id,
756 associated_source_id: _,
757 }) if *refresh_table_id == refresh_args.table_id => {
758 debug_assert!(
759 !refresh_args.is_refreshing,
760 "cannot start refresh twice"
761 );
762 refresh_args.is_refreshing = true;
763 tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
764
765 Self::init_refresh_progress(
767 &self.state_table,
768 &mut refresh_args.progress_table,
769 barrier.epoch.curr,
770 )?;
771 }
772 Some(Mutation::LoadFinish {
773 associated_source_id: load_finish_source_id,
774 }) => {
775 let associated_source_id = match refresh_args
777 .table_catalog
778 .optional_associated_source_id
779 {
780 Some(OptionalAssociatedSourceId::AssociatedSourceId(id)) => id,
781 None => unreachable!("associated_source_id is not set"),
782 };
783
784 if load_finish_source_id.table_id() == associated_source_id {
785 tracing::info!(
786 %load_finish_source_id,
787 "LoadFinish received, starting data replacement"
788 );
789 expect_next_state =
790 Box::new(MaterializeStreamState::<_>::MergingData);
791 }
792 }
793 _ => {}
794 }
795 }
796
797 if !self.may_have_downstream
801 && barrier.has_more_downstream_fragments(self.actor_context.id)
802 {
803 self.may_have_downstream = true;
804 }
805 Self::may_update_depended_subscriptions(
806 &mut self.depended_subscription_ids,
807 &barrier,
808 mv_table_id,
809 );
810 let op_consistency_level = get_op_consistency_level(
811 self.conflict_behavior,
812 self.may_have_downstream,
813 &self.depended_subscription_ids,
814 );
815 let post_commit = self
816 .state_table
817 .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
818 .await?;
819 if !post_commit.inner().is_consistent_op() {
820 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
821 }
822
823 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
824
825 let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
827 {
828 Some((
831 refresh_args.staging_table.commit(barrier.epoch).await?,
832 refresh_args.progress_table.commit(barrier.epoch).await?,
833 ))
834 } else {
835 None
836 };
837
838 let b_epoch = barrier.epoch;
839 yield Message::Barrier(barrier);
840
841 if let Some((_, cache_may_stale)) = post_commit
843 .post_yield_barrier(update_vnode_bitmap.clone())
844 .await?
845 && cache_may_stale
846 {
847 self.materialize_cache.lru_cache.clear();
848 }
849
850 if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
852 staging_post_commit
853 .post_yield_barrier(update_vnode_bitmap.clone())
854 .await?;
855 progress_post_commit
856 .post_yield_barrier(update_vnode_bitmap)
857 .await?;
858 }
859
860 self.metrics
861 .materialize_current_epoch
862 .set(b_epoch.curr as i64);
863
864 *inner_state = *expect_next_state;
867 }
868 }
869 }
870 }
871
872 #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
876 async fn make_mergesort_stream<'a>(
877 main_table: &'a StateTableInner<S, SD>,
878 staging_table: &'a StateTableInner<S, SD>,
879 progress_table: &'a mut RefreshProgressTable<S>,
880 ) {
881 for vnode in main_table.vnodes().clone().iter_vnodes() {
882 let mut processed_rows = 0;
883 let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
885 if let Some(current_entry) = progress_table.get_progress(vnode) {
886 if current_entry.is_completed {
888 tracing::debug!(
889 vnode = vnode.to_index(),
890 "Skipping already completed VNode during recovery"
891 );
892 continue;
893 }
894 processed_rows += current_entry.processed_rows;
895 tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
896
897 if let Some(current_state) = ¤t_entry.current_pos {
898 (Bound::Excluded(current_state.clone()), Bound::Unbounded)
899 } else {
900 (Bound::Unbounded, Bound::Unbounded)
901 }
902 } else {
903 (Bound::Unbounded, Bound::Unbounded)
904 };
905
906 let iter_main = main_table
907 .iter_keyed_row_with_vnode(
908 vnode,
909 &pk_range,
910 PrefetchOptions::prefetch_for_large_range_scan(),
911 )
912 .await?;
913 let iter_staging = staging_table
914 .iter_keyed_row_with_vnode(
915 vnode,
916 &pk_range,
917 PrefetchOptions::prefetch_for_large_range_scan(),
918 )
919 .await?;
920
921 pin_mut!(iter_main);
922 pin_mut!(iter_staging);
923
924 let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
926 let mut staging_item: Option<KeyedRow<Bytes>> =
927 iter_staging.next().await.transpose()?;
928
929 while let Some(main_kv) = main_item {
930 let main_key = main_kv.key();
931
932 let mut should_delete = false;
934 while let Some(staging_kv) = &staging_item {
935 let staging_key = staging_kv.key();
936 match main_key.cmp(staging_key) {
937 std::cmp::Ordering::Greater => {
938 staging_item = iter_staging.next().await.transpose()?;
940 }
941 std::cmp::Ordering::Equal => {
942 break;
944 }
945 std::cmp::Ordering::Less => {
946 should_delete = true;
948 break;
949 }
950 }
951 }
952
953 if staging_item.is_none() {
955 should_delete = true;
956 }
957
958 if should_delete {
959 yield Some((vnode, main_kv.row().clone()));
960 }
961
962 processed_rows += 1;
964 tracing::info!(
965 "set progress table: vnode = {:?}, processed_rows = {:?}",
966 vnode,
967 processed_rows
968 );
969 progress_table.set_progress(
970 vnode,
971 Some(
972 main_kv
973 .row()
974 .project(main_table.pk_indices())
975 .to_owned_row(),
976 ),
977 false,
978 processed_rows,
979 )?;
980 main_item = iter_main.next().await.transpose()?;
981 }
982
983 if let Some(current_entry) = progress_table.get_progress(vnode) {
985 progress_table.set_progress(
986 vnode,
987 current_entry.current_pos.clone(),
988 true, current_entry.processed_rows,
990 )?;
991
992 tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
993 }
994 }
995
996 yield None;
998 }
999
1000 fn may_update_depended_subscriptions(
1002 depended_subscriptions: &mut HashSet<u32>,
1003 barrier: &Barrier,
1004 mv_table_id: TableId,
1005 ) {
1006 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1007 if !depended_subscriptions.insert(subscriber_id) {
1008 warn!(
1009 ?depended_subscriptions,
1010 ?mv_table_id,
1011 subscriber_id,
1012 "subscription id already exists"
1013 );
1014 }
1015 }
1016
1017 if let Some(Mutation::DropSubscriptions {
1018 subscriptions_to_drop,
1019 }) = barrier.mutation.as_deref()
1020 {
1021 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1022 if *upstream_mv_table_id == mv_table_id
1023 && !depended_subscriptions.remove(subscriber_id)
1024 {
1025 warn!(
1026 ?depended_subscriptions,
1027 ?mv_table_id,
1028 subscriber_id,
1029 "drop non existing subscriber_id id"
1030 );
1031 }
1032 }
1033 }
1034 }
1035
1036 fn init_refresh_progress(
1038 state_table: &StateTableInner<S, SD>,
1039 progress_table: &mut RefreshProgressTable<S>,
1040 _epoch: u64,
1041 ) -> StreamExecutorResult<()> {
1042 debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1043
1044 for vnode in state_table.vnodes().iter_vnodes() {
1046 progress_table.set_progress(
1047 vnode, None, false, 0, )?;
1051 }
1052
1053 tracing::info!(
1054 vnodes_count = state_table.vnodes().count_ones(),
1055 "Initialized refresh progress tracking for all VNodes"
1056 );
1057
1058 Ok(())
1059 }
1060}
1061
1062impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1063 #[cfg(any(test, feature = "test"))]
1065 pub async fn for_test(
1066 input: Executor,
1067 store: S,
1068 table_id: TableId,
1069 keys: Vec<ColumnOrder>,
1070 column_ids: Vec<risingwave_common::catalog::ColumnId>,
1071 watermark_epoch: AtomicU64Ref,
1072 conflict_behavior: ConflictBehavior,
1073 ) -> Self {
1074 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1075 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1076 let schema = input.schema().clone();
1077 let columns: Vec<ColumnDesc> = column_ids
1078 .into_iter()
1079 .zip_eq_fast(schema.fields.iter())
1080 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1081 .collect_vec();
1082
1083 let row_serde = BasicSerde::new(
1084 Arc::from((0..columns.len()).collect_vec()),
1085 Arc::from(columns.clone().into_boxed_slice()),
1086 );
1087 let state_table = StateTableInner::from_table_catalog(
1088 &crate::common::table::test_utils::gen_pbtable(
1089 table_id,
1090 columns,
1091 arrange_order_types,
1092 arrange_columns.clone(),
1093 0,
1094 ),
1095 store,
1096 None,
1097 )
1098 .await;
1099
1100 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
1101
1102 Self {
1103 input,
1104 schema,
1105 state_table,
1106 arrange_key_indices: arrange_columns.clone(),
1107 actor_context: ActorContext::for_test(0),
1108 materialize_cache: MaterializeCache::new(
1109 watermark_epoch,
1110 MetricsInfo::for_test(),
1111 row_serde,
1112 vec![],
1113 ),
1114 conflict_behavior,
1115 version_column_indices: vec![],
1116 is_dummy_table: false,
1117 toastable_column_indices: None,
1118 may_have_downstream: true,
1119 depended_subscription_ids: HashSet::new(),
1120 metrics,
1121 refresh_args: None, local_barrier_manager: LocalBarrierManager::for_test(),
1123 }
1124 }
1125}
1126
1127fn is_unavailable_value_str(s: &str) -> bool {
1130 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1131}
1132
1133fn is_debezium_unavailable_value(
1136 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1137) -> bool {
1138 match datum {
1139 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1140 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1141 jsonb_ref
1143 .as_str()
1144 .map(is_unavailable_value_str)
1145 .unwrap_or(false)
1146 }
1147 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1148 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1152 is_unavailable_value_str(bytea_str)
1153 } else {
1154 false
1155 }
1156 }
1157 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1158 if list_ref.len() == 1 {
1162 if let Some(Some(element)) = list_ref.get(0) {
1163 is_debezium_unavailable_value(&Some(element))
1165 } else {
1166 false
1167 }
1168 } else {
1169 false
1170 }
1171 }
1172 _ => false,
1173 }
1174}
1175
1176fn handle_toast_columns_for_postgres_cdc(
1178 old_row: &OwnedRow,
1179 new_row: &OwnedRow,
1180 toastable_indices: &[usize],
1181) -> OwnedRow {
1182 let mut fixed_row_data = new_row.as_inner().to_vec();
1183
1184 for &toast_idx in toastable_indices {
1185 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1187 if is_unavailable {
1188 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1190 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1191 }
1192 }
1193 }
1194
1195 OwnedRow::new(fixed_row_data)
1196}
1197
1198fn generate_output(
1200 change_buffer: ChangeBuffer,
1201 data_types: Vec<DataType>,
1202) -> StreamExecutorResult<Option<StreamChunk>> {
1203 let mut new_ops: Vec<Op> = vec![];
1206 let mut new_rows: Vec<OwnedRow> = vec![];
1207 for (_, row_op) in change_buffer.into_parts() {
1208 match row_op {
1209 ChangeBufferKeyOp::Insert(value) => {
1210 new_ops.push(Op::Insert);
1211 new_rows.push(value);
1212 }
1213 ChangeBufferKeyOp::Delete(old_value) => {
1214 new_ops.push(Op::Delete);
1215 new_rows.push(old_value);
1216 }
1217 ChangeBufferKeyOp::Update((old_value, new_value)) => {
1218 if old_value != new_value {
1220 new_ops.push(Op::UpdateDelete);
1221 new_ops.push(Op::UpdateInsert);
1222 new_rows.push(old_value);
1223 new_rows.push(new_value);
1224 }
1225 }
1226 }
1227 }
1228 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
1229
1230 for row in new_rows {
1231 let res = data_chunk_builder.append_one_row(row);
1232 debug_assert!(res.is_none());
1233 }
1234
1235 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
1236 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
1237 Ok(Some(new_stream_chunk))
1238 } else {
1239 Ok(None)
1240 }
1241}
1242
1243pub struct ChangeBuffer {
1246 buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
1247}
1248
1249enum ChangeBufferKeyOp {
1251 Insert(OwnedRow),
1252 Delete(OwnedRow),
1253 Update((OwnedRow, OwnedRow)),
1255}
1256
1257impl ChangeBuffer {
1258 fn new() -> Self {
1259 Self {
1260 buffer: HashMap::new(),
1261 }
1262 }
1263
1264 fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
1265 let entry = self.buffer.entry(pk);
1266 match entry {
1267 Entry::Vacant(e) => {
1268 e.insert(ChangeBufferKeyOp::Insert(value));
1269 }
1270 Entry::Occupied(mut e) => {
1271 if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
1272 let old_val = std::mem::take(old_value);
1273 e.insert(ChangeBufferKeyOp::Update((old_val, value)));
1274 } else {
1275 unreachable!();
1276 }
1277 }
1278 }
1279 }
1280
1281 fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
1282 let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
1283 match entry {
1284 Entry::Vacant(e) => {
1285 e.insert(ChangeBufferKeyOp::Delete(old_value));
1286 }
1287 Entry::Occupied(mut e) => match e.get_mut() {
1288 ChangeBufferKeyOp::Insert(_) => {
1289 e.remove();
1290 }
1291 ChangeBufferKeyOp::Update((prev, _curr)) => {
1292 let prev = std::mem::take(prev);
1293 e.insert(ChangeBufferKeyOp::Delete(prev));
1294 }
1295 ChangeBufferKeyOp::Delete(_) => {
1296 unreachable!();
1297 }
1298 },
1299 }
1300 }
1301
1302 fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
1303 let entry = self.buffer.entry(pk);
1304 match entry {
1305 Entry::Vacant(e) => {
1306 e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
1307 }
1308 Entry::Occupied(mut e) => match e.get_mut() {
1309 ChangeBufferKeyOp::Insert(_) => {
1310 e.insert(ChangeBufferKeyOp::Insert(new_value));
1311 }
1312 ChangeBufferKeyOp::Update((_prev, curr)) => {
1313 *curr = new_value;
1314 }
1315 ChangeBufferKeyOp::Delete(_) => {
1316 unreachable!()
1317 }
1318 },
1319 }
1320 }
1321
1322 fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
1323 self.buffer
1324 }
1325}
1326impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1327 fn execute(self: Box<Self>) -> BoxedMessageStream {
1328 self.execute_inner().boxed()
1329 }
1330}
1331
1332impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334 f.debug_struct("MaterializeExecutor")
1335 .field("arrange_key_indices", &self.arrange_key_indices)
1336 .finish()
1337 }
1338}
1339
1340struct MaterializeCache<SD> {
1342 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1343 row_serde: BasicSerde,
1344 version_column_indices: Vec<u32>,
1345 _serde: PhantomData<SD>,
1346}
1347
1348type CacheValue = Option<CompactedRow>;
1349
1350impl<SD: ValueRowSerde> MaterializeCache<SD> {
1351 fn new(
1352 watermark_sequence: AtomicU64Ref,
1353 metrics_info: MetricsInfo,
1354 row_serde: BasicSerde,
1355 version_column_indices: Vec<u32>,
1356 ) -> Self {
1357 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1358 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
1359 Self {
1360 lru_cache,
1361 row_serde,
1362 version_column_indices,
1363 _serde: PhantomData,
1364 }
1365 }
1366
1367 async fn handle<S: StateStore>(
1370 &mut self,
1371 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1372 table: &StateTableInner<S, SD>,
1373 conflict_behavior: ConflictBehavior,
1374 metrics: &MaterializeMetrics,
1375 toastable_column_indices: Option<&[usize]>,
1376 ) -> StreamExecutorResult<ChangeBuffer> {
1377 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1378
1379 let key_set: HashSet<Box<[u8]>> = row_ops
1380 .iter()
1381 .map(|(_, k, _)| k.as_slice().into())
1382 .collect();
1383
1384 self.fetch_keys(
1387 key_set.iter().map(|v| v.deref()),
1388 table,
1389 conflict_behavior,
1390 metrics,
1391 )
1392 .await?;
1393
1394 let mut change_buffer = ChangeBuffer::new();
1395 let row_serde = self.row_serde.clone();
1396 let version_column_indices = self.version_column_indices.clone();
1397 for (op, key, row) in row_ops {
1398 match op {
1399 Op::Insert | Op::UpdateInsert => {
1400 let Some(old_row) = self.get_expected(&key) else {
1401 let new_row_deserialized =
1403 row_serde.deserializer.deserialize(row.clone())?;
1404 change_buffer.insert(key.clone(), new_row_deserialized);
1405 self.lru_cache.put(key, Some(CompactedRow { row }));
1406 continue;
1407 };
1408
1409 match conflict_behavior {
1411 ConflictBehavior::Overwrite => {
1412 let old_row_deserialized =
1413 row_serde.deserializer.deserialize(old_row.row.clone())?;
1414 let new_row_deserialized =
1415 row_serde.deserializer.deserialize(row.clone())?;
1416
1417 let need_overwrite = if !version_column_indices.is_empty() {
1418 versions_are_newer_or_equal(
1419 &old_row_deserialized,
1420 &new_row_deserialized,
1421 &version_column_indices,
1422 )
1423 } else {
1424 true
1426 };
1427
1428 if need_overwrite {
1429 if let Some(toastable_indices) = toastable_column_indices {
1430 let final_row = handle_toast_columns_for_postgres_cdc(
1432 &old_row_deserialized,
1433 &new_row_deserialized,
1434 toastable_indices,
1435 );
1436
1437 change_buffer.update(
1438 key.clone(),
1439 old_row_deserialized,
1440 final_row.clone(),
1441 );
1442 let final_row_bytes =
1443 Bytes::from(row_serde.serializer.serialize(final_row));
1444 self.lru_cache.put(
1445 key.clone(),
1446 Some(CompactedRow {
1447 row: final_row_bytes,
1448 }),
1449 );
1450 } else {
1451 change_buffer.update(
1453 key.clone(),
1454 old_row_deserialized,
1455 new_row_deserialized,
1456 );
1457 self.lru_cache
1458 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1459 }
1460 };
1461 }
1462 ConflictBehavior::IgnoreConflict => {
1463 }
1465 ConflictBehavior::DoUpdateIfNotNull => {
1466 let old_row_deserialized =
1469 row_serde.deserializer.deserialize(old_row.row.clone())?;
1470 let new_row_deserialized =
1471 row_serde.deserializer.deserialize(row.clone())?;
1472 let need_overwrite = if !version_column_indices.is_empty() {
1473 versions_are_newer_or_equal(
1474 &old_row_deserialized,
1475 &new_row_deserialized,
1476 &version_column_indices,
1477 )
1478 } else {
1479 true
1480 };
1481
1482 if need_overwrite {
1483 let mut row_deserialized_vec =
1484 old_row_deserialized.clone().into_inner().into_vec();
1485 replace_if_not_null(
1486 &mut row_deserialized_vec,
1487 new_row_deserialized.clone(),
1488 );
1489 let mut updated_row = OwnedRow::new(row_deserialized_vec);
1490
1491 if let Some(toastable_indices) = toastable_column_indices {
1493 let old_row_deserialized_again =
1496 row_serde.deserializer.deserialize(old_row.row.clone())?;
1497 updated_row = handle_toast_columns_for_postgres_cdc(
1498 &old_row_deserialized_again,
1499 &updated_row,
1500 toastable_indices,
1501 );
1502 }
1503
1504 change_buffer.update(
1505 key.clone(),
1506 old_row_deserialized,
1507 updated_row.clone(),
1508 );
1509 let updated_row_bytes =
1510 Bytes::from(row_serde.serializer.serialize(updated_row));
1511 self.lru_cache.put(
1512 key.clone(),
1513 Some(CompactedRow {
1514 row: updated_row_bytes,
1515 }),
1516 );
1517 }
1518 }
1519 _ => unreachable!(),
1520 };
1521 }
1522
1523 Op::Delete | Op::UpdateDelete => {
1524 match conflict_behavior {
1525 checked_conflict_behaviors!() => {
1526 if let Some(old_row) = self.get_expected(&key) {
1527 let old_row_deserialized =
1528 row_serde.deserializer.deserialize(old_row.row.clone())?;
1529 change_buffer.delete(key.clone(), old_row_deserialized);
1530 self.lru_cache.put(key, None);
1532 } else {
1533 };
1536 }
1537 _ => unreachable!(),
1538 };
1539 }
1540 }
1541 }
1542 Ok(change_buffer)
1543 }
1544
1545 async fn fetch_keys<'a, S: StateStore>(
1546 &mut self,
1547 keys: impl Iterator<Item = &'a [u8]>,
1548 table: &StateTableInner<S, SD>,
1549 conflict_behavior: ConflictBehavior,
1550 metrics: &MaterializeMetrics,
1551 ) -> StreamExecutorResult<()> {
1552 let mut futures = vec![];
1553 for key in keys {
1554 metrics.materialize_cache_total_count.inc();
1555
1556 if self.lru_cache.contains(key) {
1557 if self.lru_cache.get(key).unwrap().is_some() {
1558 metrics.materialize_data_exist_count.inc();
1559 }
1560 metrics.materialize_cache_hit_count.inc();
1561 continue;
1562 }
1563 futures.push(async {
1564 let key_row = table.pk_serde().deserialize(key).unwrap();
1565 let row = table.get_row(key_row).await?.map(CompactedRow::from);
1566 StreamExecutorResult::Ok((key.to_vec(), row))
1567 });
1568 }
1569
1570 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1571 while let Some(result) = buffered.next().await {
1572 let (key, row) = result?;
1573 if row.is_some() {
1574 metrics.materialize_data_exist_count.inc();
1575 }
1576 match conflict_behavior {
1578 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1579 _ => unreachable!(),
1580 };
1581 }
1582
1583 Ok(())
1584 }
1585
1586 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1587 self.lru_cache.get(key).unwrap_or_else(|| {
1588 panic!(
1589 "the key {:?} has not been fetched in the materialize executor's cache ",
1590 key
1591 )
1592 })
1593 }
1594
1595 fn evict(&mut self) {
1596 self.lru_cache.evict()
1597 }
1598}
1599
1600fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1613 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1614 if let Some(new_value) = new_col {
1615 *old_col = Some(new_value);
1616 }
1617 }
1618}
1619
1620fn versions_are_newer_or_equal(
1623 old_row: &OwnedRow,
1624 new_row: &OwnedRow,
1625 version_column_indices: &[u32],
1626) -> bool {
1627 if version_column_indices.is_empty() {
1628 return true;
1630 }
1631
1632 for &idx in version_column_indices {
1633 let old_value = old_row.index(idx as usize);
1634 let new_value = new_row.index(idx as usize);
1635
1636 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1637 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
1641 }
1642
1643 true
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649
1650 use std::iter;
1651 use std::sync::atomic::AtomicU64;
1652
1653 use rand::rngs::SmallRng;
1654 use rand::{Rng, RngCore, SeedableRng};
1655 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1656 use risingwave_common::catalog::Field;
1657 use risingwave_common::util::epoch::test_epoch;
1658 use risingwave_common::util::sort_util::OrderType;
1659 use risingwave_hummock_sdk::HummockReadEpoch;
1660 use risingwave_storage::memory::MemoryStateStore;
1661 use risingwave_storage::table::batch_table::BatchTable;
1662
1663 use super::*;
1664 use crate::executor::test_utils::*;
1665
1666 #[tokio::test]
1667 async fn test_materialize_executor() {
1668 let memory_state_store = MemoryStateStore::new();
1670 let table_id = TableId::new(1);
1671 let schema = Schema::new(vec![
1673 Field::unnamed(DataType::Int32),
1674 Field::unnamed(DataType::Int32),
1675 ]);
1676 let column_ids = vec![0.into(), 1.into()];
1677
1678 let chunk1 = StreamChunk::from_pretty(
1680 " i i
1681 + 1 4
1682 + 2 5
1683 + 3 6",
1684 );
1685 let chunk2 = StreamChunk::from_pretty(
1686 " i i
1687 + 7 8
1688 - 3 6",
1689 );
1690
1691 let source = MockSource::with_messages(vec![
1693 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1694 Message::Chunk(chunk1),
1695 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1696 Message::Chunk(chunk2),
1697 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1698 ])
1699 .into_executor(schema.clone(), PkIndices::new());
1700
1701 let order_types = vec![OrderType::ascending()];
1702 let column_descs = vec![
1703 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1704 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1705 ];
1706
1707 let table = BatchTable::for_test(
1708 memory_state_store.clone(),
1709 table_id,
1710 column_descs,
1711 order_types,
1712 vec![0],
1713 vec![0, 1],
1714 );
1715
1716 let mut materialize_executor = MaterializeExecutor::for_test(
1717 source,
1718 memory_state_store,
1719 table_id,
1720 vec![ColumnOrder::new(0, OrderType::ascending())],
1721 column_ids,
1722 Arc::new(AtomicU64::new(0)),
1723 ConflictBehavior::NoCheck,
1724 )
1725 .await
1726 .boxed()
1727 .execute();
1728 materialize_executor.next().await.transpose().unwrap();
1729
1730 materialize_executor.next().await.transpose().unwrap();
1731
1732 match materialize_executor.next().await.transpose().unwrap() {
1734 Some(Message::Barrier(_)) => {
1735 let row = table
1736 .get_row(
1737 &OwnedRow::new(vec![Some(3_i32.into())]),
1738 HummockReadEpoch::NoWait(u64::MAX),
1739 )
1740 .await
1741 .unwrap();
1742 assert_eq!(
1743 row,
1744 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1745 );
1746 }
1747 _ => unreachable!(),
1748 }
1749 materialize_executor.next().await.transpose().unwrap();
1750 match materialize_executor.next().await.transpose().unwrap() {
1752 Some(Message::Barrier(_)) => {
1753 let row = table
1754 .get_row(
1755 &OwnedRow::new(vec![Some(7_i32.into())]),
1756 HummockReadEpoch::NoWait(u64::MAX),
1757 )
1758 .await
1759 .unwrap();
1760 assert_eq!(
1761 row,
1762 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1763 );
1764 }
1765 _ => unreachable!(),
1766 }
1767 }
1768
1769 #[tokio::test]
1771 async fn test_upsert_stream() {
1772 let memory_state_store = MemoryStateStore::new();
1774 let table_id = TableId::new(1);
1775 let schema = Schema::new(vec![
1777 Field::unnamed(DataType::Int32),
1778 Field::unnamed(DataType::Int32),
1779 ]);
1780 let column_ids = vec![0.into(), 1.into()];
1781
1782 let chunk1 = StreamChunk::from_pretty(
1784 " i i
1785 + 1 1",
1786 );
1787
1788 let chunk2 = StreamChunk::from_pretty(
1789 " i i
1790 + 1 2
1791 - 1 2",
1792 );
1793
1794 let source = MockSource::with_messages(vec![
1796 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1797 Message::Chunk(chunk1),
1798 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1799 Message::Chunk(chunk2),
1800 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1801 ])
1802 .into_executor(schema.clone(), PkIndices::new());
1803
1804 let order_types = vec![OrderType::ascending()];
1805 let column_descs = vec![
1806 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1807 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1808 ];
1809
1810 let table = BatchTable::for_test(
1811 memory_state_store.clone(),
1812 table_id,
1813 column_descs,
1814 order_types,
1815 vec![0],
1816 vec![0, 1],
1817 );
1818
1819 let mut materialize_executor = MaterializeExecutor::for_test(
1820 source,
1821 memory_state_store,
1822 table_id,
1823 vec![ColumnOrder::new(0, OrderType::ascending())],
1824 column_ids,
1825 Arc::new(AtomicU64::new(0)),
1826 ConflictBehavior::Overwrite,
1827 )
1828 .await
1829 .boxed()
1830 .execute();
1831 materialize_executor.next().await.transpose().unwrap();
1832
1833 materialize_executor.next().await.transpose().unwrap();
1834 materialize_executor.next().await.transpose().unwrap();
1835 materialize_executor.next().await.transpose().unwrap();
1836
1837 match materialize_executor.next().await.transpose().unwrap() {
1838 Some(Message::Barrier(_)) => {
1839 let row = table
1840 .get_row(
1841 &OwnedRow::new(vec![Some(1_i32.into())]),
1842 HummockReadEpoch::NoWait(u64::MAX),
1843 )
1844 .await
1845 .unwrap();
1846 assert!(row.is_none());
1847 }
1848 _ => unreachable!(),
1849 }
1850 }
1851
1852 #[tokio::test]
1853 async fn test_check_insert_conflict() {
1854 let memory_state_store = MemoryStateStore::new();
1856 let table_id = TableId::new(1);
1857 let schema = Schema::new(vec![
1859 Field::unnamed(DataType::Int32),
1860 Field::unnamed(DataType::Int32),
1861 ]);
1862 let column_ids = vec![0.into(), 1.into()];
1863
1864 let chunk1 = StreamChunk::from_pretty(
1866 " i i
1867 + 1 3
1868 + 1 4
1869 + 2 5
1870 + 3 6",
1871 );
1872
1873 let chunk2 = StreamChunk::from_pretty(
1874 " i i
1875 + 1 3
1876 + 2 6",
1877 );
1878
1879 let chunk3 = StreamChunk::from_pretty(
1881 " i i
1882 + 1 4",
1883 );
1884
1885 let source = MockSource::with_messages(vec![
1887 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1888 Message::Chunk(chunk1),
1889 Message::Chunk(chunk2),
1890 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1891 Message::Chunk(chunk3),
1892 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1893 ])
1894 .into_executor(schema.clone(), PkIndices::new());
1895
1896 let order_types = vec![OrderType::ascending()];
1897 let column_descs = vec![
1898 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1899 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1900 ];
1901
1902 let table = BatchTable::for_test(
1903 memory_state_store.clone(),
1904 table_id,
1905 column_descs,
1906 order_types,
1907 vec![0],
1908 vec![0, 1],
1909 );
1910
1911 let mut materialize_executor = MaterializeExecutor::for_test(
1912 source,
1913 memory_state_store,
1914 table_id,
1915 vec![ColumnOrder::new(0, OrderType::ascending())],
1916 column_ids,
1917 Arc::new(AtomicU64::new(0)),
1918 ConflictBehavior::Overwrite,
1919 )
1920 .await
1921 .boxed()
1922 .execute();
1923 materialize_executor.next().await.transpose().unwrap();
1924
1925 materialize_executor.next().await.transpose().unwrap();
1926 materialize_executor.next().await.transpose().unwrap();
1927
1928 match materialize_executor.next().await.transpose().unwrap() {
1930 Some(Message::Barrier(_)) => {
1931 let row = table
1932 .get_row(
1933 &OwnedRow::new(vec![Some(3_i32.into())]),
1934 HummockReadEpoch::NoWait(u64::MAX),
1935 )
1936 .await
1937 .unwrap();
1938 assert_eq!(
1939 row,
1940 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1941 );
1942
1943 let row = table
1944 .get_row(
1945 &OwnedRow::new(vec![Some(1_i32.into())]),
1946 HummockReadEpoch::NoWait(u64::MAX),
1947 )
1948 .await
1949 .unwrap();
1950 assert_eq!(
1951 row,
1952 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1953 );
1954
1955 let row = table
1956 .get_row(
1957 &OwnedRow::new(vec![Some(2_i32.into())]),
1958 HummockReadEpoch::NoWait(u64::MAX),
1959 )
1960 .await
1961 .unwrap();
1962 assert_eq!(
1963 row,
1964 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1965 );
1966 }
1967 _ => unreachable!(),
1968 }
1969 }
1970
1971 #[tokio::test]
1972 async fn test_delete_and_update_conflict() {
1973 let memory_state_store = MemoryStateStore::new();
1975 let table_id = TableId::new(1);
1976 let schema = Schema::new(vec![
1978 Field::unnamed(DataType::Int32),
1979 Field::unnamed(DataType::Int32),
1980 ]);
1981 let column_ids = vec![0.into(), 1.into()];
1982
1983 let chunk1 = StreamChunk::from_pretty(
1985 " i i
1986 + 1 4
1987 + 2 5
1988 + 3 6
1989 U- 8 1
1990 U+ 8 2
1991 + 8 3",
1992 );
1993
1994 let chunk2 = StreamChunk::from_pretty(
1996 " i i
1997 + 7 8
1998 - 3 4
1999 - 5 0",
2000 );
2001
2002 let chunk3 = StreamChunk::from_pretty(
2004 " i i
2005 + 1 5
2006 U- 2 4
2007 U+ 2 8
2008 U- 9 0
2009 U+ 9 1",
2010 );
2011
2012 let source = MockSource::with_messages(vec![
2014 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2015 Message::Chunk(chunk1),
2016 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2017 Message::Chunk(chunk2),
2018 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2019 Message::Chunk(chunk3),
2020 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2021 ])
2022 .into_executor(schema.clone(), PkIndices::new());
2023
2024 let order_types = vec![OrderType::ascending()];
2025 let column_descs = vec![
2026 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2027 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2028 ];
2029
2030 let table = BatchTable::for_test(
2031 memory_state_store.clone(),
2032 table_id,
2033 column_descs,
2034 order_types,
2035 vec![0],
2036 vec![0, 1],
2037 );
2038
2039 let mut materialize_executor = MaterializeExecutor::for_test(
2040 source,
2041 memory_state_store,
2042 table_id,
2043 vec![ColumnOrder::new(0, OrderType::ascending())],
2044 column_ids,
2045 Arc::new(AtomicU64::new(0)),
2046 ConflictBehavior::Overwrite,
2047 )
2048 .await
2049 .boxed()
2050 .execute();
2051 materialize_executor.next().await.transpose().unwrap();
2052
2053 materialize_executor.next().await.transpose().unwrap();
2054
2055 match materialize_executor.next().await.transpose().unwrap() {
2057 Some(Message::Barrier(_)) => {
2058 let row = table
2060 .get_row(
2061 &OwnedRow::new(vec![Some(8_i32.into())]),
2062 HummockReadEpoch::NoWait(u64::MAX),
2063 )
2064 .await
2065 .unwrap();
2066 assert_eq!(
2067 row,
2068 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
2069 );
2070 }
2071 _ => unreachable!(),
2072 }
2073 materialize_executor.next().await.transpose().unwrap();
2074
2075 match materialize_executor.next().await.transpose().unwrap() {
2076 Some(Message::Barrier(_)) => {
2077 let row = table
2078 .get_row(
2079 &OwnedRow::new(vec![Some(7_i32.into())]),
2080 HummockReadEpoch::NoWait(u64::MAX),
2081 )
2082 .await
2083 .unwrap();
2084 assert_eq!(
2085 row,
2086 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2087 );
2088
2089 let row = table
2091 .get_row(
2092 &OwnedRow::new(vec![Some(3_i32.into())]),
2093 HummockReadEpoch::NoWait(u64::MAX),
2094 )
2095 .await
2096 .unwrap();
2097 assert_eq!(row, None);
2098
2099 let row = table
2101 .get_row(
2102 &OwnedRow::new(vec![Some(5_i32.into())]),
2103 HummockReadEpoch::NoWait(u64::MAX),
2104 )
2105 .await
2106 .unwrap();
2107 assert_eq!(row, None);
2108 }
2109 _ => unreachable!(),
2110 }
2111
2112 materialize_executor.next().await.transpose().unwrap();
2113 match materialize_executor.next().await.transpose().unwrap() {
2115 Some(Message::Barrier(_)) => {
2116 let row = table
2117 .get_row(
2118 &OwnedRow::new(vec![Some(1_i32.into())]),
2119 HummockReadEpoch::NoWait(u64::MAX),
2120 )
2121 .await
2122 .unwrap();
2123 assert_eq!(
2124 row,
2125 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
2126 );
2127
2128 let row = table
2130 .get_row(
2131 &OwnedRow::new(vec![Some(2_i32.into())]),
2132 HummockReadEpoch::NoWait(u64::MAX),
2133 )
2134 .await
2135 .unwrap();
2136 assert_eq!(
2137 row,
2138 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2139 );
2140
2141 let row = table
2143 .get_row(
2144 &OwnedRow::new(vec![Some(9_i32.into())]),
2145 HummockReadEpoch::NoWait(u64::MAX),
2146 )
2147 .await
2148 .unwrap();
2149 assert_eq!(
2150 row,
2151 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2152 );
2153 }
2154 _ => unreachable!(),
2155 }
2156 }
2157
2158 #[tokio::test]
2159 async fn test_ignore_insert_conflict() {
2160 let memory_state_store = MemoryStateStore::new();
2162 let table_id = TableId::new(1);
2163 let schema = Schema::new(vec![
2165 Field::unnamed(DataType::Int32),
2166 Field::unnamed(DataType::Int32),
2167 ]);
2168 let column_ids = vec![0.into(), 1.into()];
2169
2170 let chunk1 = StreamChunk::from_pretty(
2172 " i i
2173 + 1 3
2174 + 1 4
2175 + 2 5
2176 + 3 6",
2177 );
2178
2179 let chunk2 = StreamChunk::from_pretty(
2180 " i i
2181 + 1 5
2182 + 2 6",
2183 );
2184
2185 let chunk3 = StreamChunk::from_pretty(
2187 " i i
2188 + 1 6",
2189 );
2190
2191 let source = MockSource::with_messages(vec![
2193 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2194 Message::Chunk(chunk1),
2195 Message::Chunk(chunk2),
2196 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2197 Message::Chunk(chunk3),
2198 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2199 ])
2200 .into_executor(schema.clone(), PkIndices::new());
2201
2202 let order_types = vec![OrderType::ascending()];
2203 let column_descs = vec![
2204 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2205 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2206 ];
2207
2208 let table = BatchTable::for_test(
2209 memory_state_store.clone(),
2210 table_id,
2211 column_descs,
2212 order_types,
2213 vec![0],
2214 vec![0, 1],
2215 );
2216
2217 let mut materialize_executor = MaterializeExecutor::for_test(
2218 source,
2219 memory_state_store,
2220 table_id,
2221 vec![ColumnOrder::new(0, OrderType::ascending())],
2222 column_ids,
2223 Arc::new(AtomicU64::new(0)),
2224 ConflictBehavior::IgnoreConflict,
2225 )
2226 .await
2227 .boxed()
2228 .execute();
2229 materialize_executor.next().await.transpose().unwrap();
2230
2231 materialize_executor.next().await.transpose().unwrap();
2232 materialize_executor.next().await.transpose().unwrap();
2233
2234 match materialize_executor.next().await.transpose().unwrap() {
2236 Some(Message::Barrier(_)) => {
2237 let row = table
2238 .get_row(
2239 &OwnedRow::new(vec![Some(3_i32.into())]),
2240 HummockReadEpoch::NoWait(u64::MAX),
2241 )
2242 .await
2243 .unwrap();
2244 assert_eq!(
2245 row,
2246 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2247 );
2248
2249 let row = table
2250 .get_row(
2251 &OwnedRow::new(vec![Some(1_i32.into())]),
2252 HummockReadEpoch::NoWait(u64::MAX),
2253 )
2254 .await
2255 .unwrap();
2256 assert_eq!(
2257 row,
2258 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2259 );
2260
2261 let row = table
2262 .get_row(
2263 &OwnedRow::new(vec![Some(2_i32.into())]),
2264 HummockReadEpoch::NoWait(u64::MAX),
2265 )
2266 .await
2267 .unwrap();
2268 assert_eq!(
2269 row,
2270 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2271 );
2272 }
2273 _ => unreachable!(),
2274 }
2275 }
2276
2277 #[tokio::test]
2278 async fn test_ignore_delete_then_insert() {
2279 let memory_state_store = MemoryStateStore::new();
2281 let table_id = TableId::new(1);
2282 let schema = Schema::new(vec![
2284 Field::unnamed(DataType::Int32),
2285 Field::unnamed(DataType::Int32),
2286 ]);
2287 let column_ids = vec![0.into(), 1.into()];
2288
2289 let chunk1 = StreamChunk::from_pretty(
2291 " i i
2292 + 1 3
2293 - 1 3
2294 + 1 6",
2295 );
2296
2297 let source = MockSource::with_messages(vec![
2299 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2300 Message::Chunk(chunk1),
2301 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2302 ])
2303 .into_executor(schema.clone(), PkIndices::new());
2304
2305 let order_types = vec![OrderType::ascending()];
2306 let column_descs = vec![
2307 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2308 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2309 ];
2310
2311 let table = BatchTable::for_test(
2312 memory_state_store.clone(),
2313 table_id,
2314 column_descs,
2315 order_types,
2316 vec![0],
2317 vec![0, 1],
2318 );
2319
2320 let mut materialize_executor = MaterializeExecutor::for_test(
2321 source,
2322 memory_state_store,
2323 table_id,
2324 vec![ColumnOrder::new(0, OrderType::ascending())],
2325 column_ids,
2326 Arc::new(AtomicU64::new(0)),
2327 ConflictBehavior::IgnoreConflict,
2328 )
2329 .await
2330 .boxed()
2331 .execute();
2332 let _msg1 = materialize_executor
2333 .next()
2334 .await
2335 .transpose()
2336 .unwrap()
2337 .unwrap()
2338 .as_barrier()
2339 .unwrap();
2340 let _msg2 = materialize_executor
2341 .next()
2342 .await
2343 .transpose()
2344 .unwrap()
2345 .unwrap()
2346 .as_chunk()
2347 .unwrap();
2348 let _msg3 = materialize_executor
2349 .next()
2350 .await
2351 .transpose()
2352 .unwrap()
2353 .unwrap()
2354 .as_barrier()
2355 .unwrap();
2356
2357 let row = table
2358 .get_row(
2359 &OwnedRow::new(vec![Some(1_i32.into())]),
2360 HummockReadEpoch::NoWait(u64::MAX),
2361 )
2362 .await
2363 .unwrap();
2364 assert_eq!(
2365 row,
2366 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2367 );
2368 }
2369
2370 #[tokio::test]
2371 async fn test_ignore_delete_and_update_conflict() {
2372 let memory_state_store = MemoryStateStore::new();
2374 let table_id = TableId::new(1);
2375 let schema = Schema::new(vec![
2377 Field::unnamed(DataType::Int32),
2378 Field::unnamed(DataType::Int32),
2379 ]);
2380 let column_ids = vec![0.into(), 1.into()];
2381
2382 let chunk1 = StreamChunk::from_pretty(
2384 " i i
2385 + 1 4
2386 + 2 5
2387 + 3 6
2388 U- 8 1
2389 U+ 8 2
2390 + 8 3",
2391 );
2392
2393 let chunk2 = StreamChunk::from_pretty(
2395 " i i
2396 + 7 8
2397 - 3 4
2398 - 5 0",
2399 );
2400
2401 let chunk3 = StreamChunk::from_pretty(
2403 " i i
2404 + 1 5
2405 U- 2 4
2406 U+ 2 8
2407 U- 9 0
2408 U+ 9 1",
2409 );
2410
2411 let source = MockSource::with_messages(vec![
2413 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2414 Message::Chunk(chunk1),
2415 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2416 Message::Chunk(chunk2),
2417 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2418 Message::Chunk(chunk3),
2419 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2420 ])
2421 .into_executor(schema.clone(), PkIndices::new());
2422
2423 let order_types = vec![OrderType::ascending()];
2424 let column_descs = vec![
2425 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2426 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2427 ];
2428
2429 let table = BatchTable::for_test(
2430 memory_state_store.clone(),
2431 table_id,
2432 column_descs,
2433 order_types,
2434 vec![0],
2435 vec![0, 1],
2436 );
2437
2438 let mut materialize_executor = MaterializeExecutor::for_test(
2439 source,
2440 memory_state_store,
2441 table_id,
2442 vec![ColumnOrder::new(0, OrderType::ascending())],
2443 column_ids,
2444 Arc::new(AtomicU64::new(0)),
2445 ConflictBehavior::IgnoreConflict,
2446 )
2447 .await
2448 .boxed()
2449 .execute();
2450 materialize_executor.next().await.transpose().unwrap();
2451
2452 materialize_executor.next().await.transpose().unwrap();
2453
2454 match materialize_executor.next().await.transpose().unwrap() {
2456 Some(Message::Barrier(_)) => {
2457 let row = table
2459 .get_row(
2460 &OwnedRow::new(vec![Some(8_i32.into())]),
2461 HummockReadEpoch::NoWait(u64::MAX),
2462 )
2463 .await
2464 .unwrap();
2465 assert_eq!(
2466 row,
2467 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2468 );
2469 }
2470 _ => unreachable!(),
2471 }
2472 materialize_executor.next().await.transpose().unwrap();
2473
2474 match materialize_executor.next().await.transpose().unwrap() {
2475 Some(Message::Barrier(_)) => {
2476 let row = table
2477 .get_row(
2478 &OwnedRow::new(vec![Some(7_i32.into())]),
2479 HummockReadEpoch::NoWait(u64::MAX),
2480 )
2481 .await
2482 .unwrap();
2483 assert_eq!(
2484 row,
2485 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2486 );
2487
2488 let row = table
2490 .get_row(
2491 &OwnedRow::new(vec![Some(3_i32.into())]),
2492 HummockReadEpoch::NoWait(u64::MAX),
2493 )
2494 .await
2495 .unwrap();
2496 assert_eq!(row, None);
2497
2498 let row = table
2500 .get_row(
2501 &OwnedRow::new(vec![Some(5_i32.into())]),
2502 HummockReadEpoch::NoWait(u64::MAX),
2503 )
2504 .await
2505 .unwrap();
2506 assert_eq!(row, None);
2507 }
2508 _ => unreachable!(),
2509 }
2510
2511 materialize_executor.next().await.transpose().unwrap();
2512 match materialize_executor.next().await.transpose().unwrap() {
2515 Some(Message::Barrier(_)) => {
2516 let row = table
2517 .get_row(
2518 &OwnedRow::new(vec![Some(1_i32.into())]),
2519 HummockReadEpoch::NoWait(u64::MAX),
2520 )
2521 .await
2522 .unwrap();
2523 assert_eq!(
2524 row,
2525 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2526 );
2527
2528 let row = table
2530 .get_row(
2531 &OwnedRow::new(vec![Some(2_i32.into())]),
2532 HummockReadEpoch::NoWait(u64::MAX),
2533 )
2534 .await
2535 .unwrap();
2536 assert_eq!(
2537 row,
2538 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2539 );
2540
2541 let row = table
2543 .get_row(
2544 &OwnedRow::new(vec![Some(9_i32.into())]),
2545 HummockReadEpoch::NoWait(u64::MAX),
2546 )
2547 .await
2548 .unwrap();
2549 assert_eq!(
2550 row,
2551 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2552 );
2553 }
2554 _ => unreachable!(),
2555 }
2556 }
2557
2558 #[tokio::test]
2559 async fn test_do_update_if_not_null_conflict() {
2560 let memory_state_store = MemoryStateStore::new();
2562 let table_id = TableId::new(1);
2563 let schema = Schema::new(vec![
2565 Field::unnamed(DataType::Int32),
2566 Field::unnamed(DataType::Int32),
2567 ]);
2568 let column_ids = vec![0.into(), 1.into()];
2569
2570 let chunk1 = StreamChunk::from_pretty(
2572 " i i
2573 + 1 4
2574 + 2 .
2575 + 3 6
2576 U- 8 .
2577 U+ 8 2
2578 + 8 .",
2579 );
2580
2581 let chunk2 = StreamChunk::from_pretty(
2583 " i i
2584 + 7 8
2585 - 3 4
2586 - 5 0",
2587 );
2588
2589 let chunk3 = StreamChunk::from_pretty(
2591 " i i
2592 + 1 5
2593 + 7 .
2594 U- 2 4
2595 U+ 2 .
2596 U- 9 0
2597 U+ 9 1",
2598 );
2599
2600 let source = MockSource::with_messages(vec![
2602 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2603 Message::Chunk(chunk1),
2604 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2605 Message::Chunk(chunk2),
2606 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2607 Message::Chunk(chunk3),
2608 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2609 ])
2610 .into_executor(schema.clone(), PkIndices::new());
2611
2612 let order_types = vec![OrderType::ascending()];
2613 let column_descs = vec![
2614 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2615 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2616 ];
2617
2618 let table = BatchTable::for_test(
2619 memory_state_store.clone(),
2620 table_id,
2621 column_descs,
2622 order_types,
2623 vec![0],
2624 vec![0, 1],
2625 );
2626
2627 let mut materialize_executor = MaterializeExecutor::for_test(
2628 source,
2629 memory_state_store,
2630 table_id,
2631 vec![ColumnOrder::new(0, OrderType::ascending())],
2632 column_ids,
2633 Arc::new(AtomicU64::new(0)),
2634 ConflictBehavior::DoUpdateIfNotNull,
2635 )
2636 .await
2637 .boxed()
2638 .execute();
2639 materialize_executor.next().await.transpose().unwrap();
2640
2641 materialize_executor.next().await.transpose().unwrap();
2642
2643 match materialize_executor.next().await.transpose().unwrap() {
2645 Some(Message::Barrier(_)) => {
2646 let row = table
2647 .get_row(
2648 &OwnedRow::new(vec![Some(8_i32.into())]),
2649 HummockReadEpoch::NoWait(u64::MAX),
2650 )
2651 .await
2652 .unwrap();
2653 assert_eq!(
2654 row,
2655 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2656 );
2657
2658 let row = table
2659 .get_row(
2660 &OwnedRow::new(vec![Some(2_i32.into())]),
2661 HummockReadEpoch::NoWait(u64::MAX),
2662 )
2663 .await
2664 .unwrap();
2665 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2666 }
2667 _ => unreachable!(),
2668 }
2669 materialize_executor.next().await.transpose().unwrap();
2670
2671 match materialize_executor.next().await.transpose().unwrap() {
2672 Some(Message::Barrier(_)) => {
2673 let row = table
2674 .get_row(
2675 &OwnedRow::new(vec![Some(7_i32.into())]),
2676 HummockReadEpoch::NoWait(u64::MAX),
2677 )
2678 .await
2679 .unwrap();
2680 assert_eq!(
2681 row,
2682 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2683 );
2684
2685 let row = table
2687 .get_row(
2688 &OwnedRow::new(vec![Some(3_i32.into())]),
2689 HummockReadEpoch::NoWait(u64::MAX),
2690 )
2691 .await
2692 .unwrap();
2693 assert_eq!(row, None);
2694
2695 let row = table
2697 .get_row(
2698 &OwnedRow::new(vec![Some(5_i32.into())]),
2699 HummockReadEpoch::NoWait(u64::MAX),
2700 )
2701 .await
2702 .unwrap();
2703 assert_eq!(row, None);
2704 }
2705 _ => unreachable!(),
2706 }
2707
2708 materialize_executor.next().await.transpose().unwrap();
2709 match materialize_executor.next().await.transpose().unwrap() {
2712 Some(Message::Barrier(_)) => {
2713 let row = table
2714 .get_row(
2715 &OwnedRow::new(vec![Some(7_i32.into())]),
2716 HummockReadEpoch::NoWait(u64::MAX),
2717 )
2718 .await
2719 .unwrap();
2720 assert_eq!(
2721 row,
2722 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2723 );
2724
2725 let row = table
2727 .get_row(
2728 &OwnedRow::new(vec![Some(2_i32.into())]),
2729 HummockReadEpoch::NoWait(u64::MAX),
2730 )
2731 .await
2732 .unwrap();
2733 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2734
2735 let row = table
2737 .get_row(
2738 &OwnedRow::new(vec![Some(9_i32.into())]),
2739 HummockReadEpoch::NoWait(u64::MAX),
2740 )
2741 .await
2742 .unwrap();
2743 assert_eq!(
2744 row,
2745 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2746 );
2747 }
2748 _ => unreachable!(),
2749 }
2750 }
2751
2752 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2753 const KN: u32 = 4;
2754 const SEED: u64 = 998244353;
2755 let mut ret = vec![];
2756 let mut builder =
2757 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2758 let mut rng = SmallRng::seed_from_u64(SEED);
2759
2760 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2761 let len = c.data_chunk().capacity();
2762 let mut c = StreamChunkMut::from(c);
2763 for i in 0..len {
2764 c.set_vis(i, rng.random_bool(0.5));
2765 }
2766 c.into()
2767 };
2768 for _ in 0..row_number {
2769 let k = (rng.next_u32() % KN) as i32;
2770 let v = rng.next_u32() as i32;
2771 let op = if rng.random_bool(0.5) {
2772 Op::Insert
2773 } else {
2774 Op::Delete
2775 };
2776 if let Some(c) =
2777 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2778 {
2779 ret.push(random_vis(c, &mut rng));
2780 }
2781 }
2782 if let Some(c) = builder.take() {
2783 ret.push(random_vis(c, &mut rng));
2784 }
2785 ret
2786 }
2787
2788 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2789 const N: usize = 100000;
2790
2791 let memory_state_store = MemoryStateStore::new();
2793 let table_id = TableId::new(1);
2794 let schema = Schema::new(vec![
2796 Field::unnamed(DataType::Int32),
2797 Field::unnamed(DataType::Int32),
2798 ]);
2799 let column_ids = vec![0.into(), 1.into()];
2800
2801 let chunks = gen_fuzz_data(N, 128);
2802 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2803 .chain(chunks.into_iter().map(Message::Chunk))
2804 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2805 test_epoch(2),
2806 ))))
2807 .collect();
2808 let source =
2810 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2811
2812 let mut materialize_executor = MaterializeExecutor::for_test(
2813 source,
2814 memory_state_store.clone(),
2815 table_id,
2816 vec![ColumnOrder::new(0, OrderType::ascending())],
2817 column_ids,
2818 Arc::new(AtomicU64::new(0)),
2819 conflict_behavior,
2820 )
2821 .await
2822 .boxed()
2823 .execute();
2824 materialize_executor.expect_barrier().await;
2825
2826 let order_types = vec![OrderType::ascending()];
2827 let column_descs = vec![
2828 ColumnDesc::unnamed(0.into(), DataType::Int32),
2829 ColumnDesc::unnamed(1.into(), DataType::Int32),
2830 ];
2831 let pk_indices = vec![0];
2832
2833 let mut table = StateTable::from_table_catalog(
2834 &crate::common::table::test_utils::gen_pbtable(
2835 TableId::from(1002),
2836 column_descs.clone(),
2837 order_types,
2838 pk_indices,
2839 0,
2840 ),
2841 memory_state_store.clone(),
2842 None,
2843 )
2844 .await;
2845
2846 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2847 table.write_chunk(c);
2849 }
2850 }
2851
2852 #[tokio::test]
2853 async fn fuzz_test_stream_consistent_upsert() {
2854 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2855 }
2856
2857 #[tokio::test]
2858 async fn fuzz_test_stream_consistent_ignore() {
2859 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2860 }
2861}