1use std::cmp::Ordering;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, TableId};
27use risingwave_common::row::{CompactedRow, RowDeserializer};
28use risingwave_common::types::DefaultOrd;
29use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
30use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
33use risingwave_pb::catalog::Table;
34use risingwave_storage::mem_table::KeyOp;
35use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
36
37use crate::cache::ManagedLruCache;
38use crate::common::metrics::MetricsInfo;
39use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
40use crate::common::table::test_utils::gen_pbtable;
41use crate::executor::monitor::MaterializeMetrics;
42use crate::executor::prelude::*;
43
44pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
46 input: Executor,
47
48 schema: Schema,
49
50 state_table: StateTableInner<S, SD>,
51
52 arrange_key_indices: Vec<usize>,
54
55 actor_context: ActorContextRef,
56
57 materialize_cache: MaterializeCache<SD>,
58
59 conflict_behavior: ConflictBehavior,
60
61 version_column_index: Option<u32>,
62
63 may_have_downstream: bool,
64
65 depended_subscription_ids: HashSet<u32>,
66
67 metrics: MaterializeMetrics,
68}
69
70fn get_op_consistency_level(
71 conflict_behavior: ConflictBehavior,
72 may_have_downstream: bool,
73 depended_subscriptions: &HashSet<u32>,
74) -> StateTableOpConsistencyLevel {
75 if !depended_subscriptions.is_empty() {
76 StateTableOpConsistencyLevel::LogStoreEnabled
77 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
78 StateTableOpConsistencyLevel::Inconsistent
81 } else {
82 StateTableOpConsistencyLevel::ConsistentOldValue
83 }
84}
85
86impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
87 #[allow(clippy::too_many_arguments)]
91 pub async fn new(
92 input: Executor,
93 schema: Schema,
94 store: S,
95 arrange_key: Vec<ColumnOrder>,
96 actor_context: ActorContextRef,
97 vnodes: Option<Arc<Bitmap>>,
98 table_catalog: &Table,
99 watermark_epoch: AtomicU64Ref,
100 conflict_behavior: ConflictBehavior,
101 version_column_index: Option<u32>,
102 metrics: Arc<StreamingMetrics>,
103 ) -> Self {
104 let table_columns: Vec<ColumnDesc> = table_catalog
105 .columns
106 .iter()
107 .map(|col| col.column_desc.as_ref().unwrap().into())
108 .collect();
109
110 let row_serde: BasicSerde = BasicSerde::new(
111 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
112 Arc::from(table_columns.into_boxed_slice()),
113 );
114
115 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
116 let may_have_downstream = actor_context.initial_dispatch_num != 0;
117 let depended_subscription_ids = actor_context
118 .related_subscriptions
119 .get(&TableId::new(table_catalog.id))
120 .cloned()
121 .unwrap_or_default();
122 let op_consistency_level = get_op_consistency_level(
123 conflict_behavior,
124 may_have_downstream,
125 &depended_subscription_ids,
126 );
127 let state_table = StateTableInner::from_table_catalog_with_consistency_level(
129 table_catalog,
130 store,
131 vnodes,
132 op_consistency_level,
133 )
134 .await;
135
136 let mv_metrics = metrics.new_materialize_metrics(
137 TableId::new(table_catalog.id),
138 actor_context.id,
139 actor_context.fragment_id,
140 );
141
142 let metrics_info =
143 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
144
145 Self {
146 input,
147 schema,
148 state_table,
149 arrange_key_indices,
150 actor_context,
151 materialize_cache: MaterializeCache::new(
152 watermark_epoch,
153 metrics_info,
154 row_serde,
155 version_column_index,
156 ),
157 conflict_behavior,
158 version_column_index,
159 may_have_downstream,
160 depended_subscription_ids,
161 metrics: mv_metrics,
162 }
163 }
164
165 #[try_stream(ok = Message, error = StreamExecutorError)]
166 async fn execute_inner(mut self) {
167 let mv_table_id = TableId::new(self.state_table.table_id());
168
169 let data_types = self.schema.data_types();
170 let mut input = self.input.execute();
171
172 let barrier = expect_first_barrier(&mut input).await?;
173 let first_epoch = barrier.epoch;
174 yield Message::Barrier(barrier);
176 self.state_table.init_epoch(first_epoch).await?;
177
178 #[for_await]
179 for msg in input {
180 let msg = msg?;
181 self.materialize_cache.evict();
182
183 yield match msg {
184 Message::Watermark(w) => Message::Watermark(w),
185 Message::Chunk(chunk) => {
186 self.metrics
187 .materialize_input_row_count
188 .inc_by(chunk.cardinality() as u64);
189
190 let do_not_handle_conflict = !self.state_table.is_consistent_op()
194 && self.version_column_index.is_none()
195 && self.conflict_behavior == ConflictBehavior::Overwrite;
196 match self.conflict_behavior {
197 ConflictBehavior::Overwrite
198 | ConflictBehavior::IgnoreConflict
199 | ConflictBehavior::DoUpdateIfNotNull
200 if !do_not_handle_conflict =>
201 {
202 if chunk.cardinality() == 0 {
203 continue;
205 }
206 let (data_chunk, ops) = chunk.into_parts();
207
208 if self.state_table.value_indices().is_some() {
209 panic!(
212 "materialize executor with data check can not handle only materialize partial columns"
213 )
214 };
215 let values = data_chunk.serialize();
216
217 let key_chunk = data_chunk.project(self.state_table.pk_indices());
218
219 let pks = {
220 let mut pks = vec![vec![]; data_chunk.capacity()];
221 key_chunk
222 .rows_with_holes()
223 .zip_eq_fast(pks.iter_mut())
224 .for_each(|(r, vnode_and_pk)| {
225 if let Some(r) = r {
226 self.state_table.pk_serde().serialize(r, vnode_and_pk);
227 }
228 });
229 pks
230 };
231 let (_, vis) = key_chunk.into_parts();
232 let row_ops = ops
233 .iter()
234 .zip_eq_debug(pks.into_iter())
235 .zip_eq_debug(values.into_iter())
236 .zip_eq_debug(vis.iter())
237 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
238 .collect_vec();
239
240 let fixed_changes = self
241 .materialize_cache
242 .handle(
243 row_ops,
244 &self.state_table,
245 &self.conflict_behavior,
246 &self.metrics,
247 )
248 .await?;
249
250 match generate_output(fixed_changes, data_types.clone())? {
251 Some(output_chunk) => {
252 self.state_table.write_chunk(output_chunk.clone());
253 self.state_table.try_flush().await?;
254 Message::Chunk(output_chunk)
255 }
256 None => continue,
257 }
258 }
259 ConflictBehavior::IgnoreConflict => unreachable!(),
260 ConflictBehavior::NoCheck
261 | ConflictBehavior::Overwrite
262 | ConflictBehavior::DoUpdateIfNotNull => {
263 self.state_table.write_chunk(chunk.clone());
264 self.state_table.try_flush().await?;
265 Message::Chunk(chunk)
266 } }
268 }
269 Message::Barrier(b) => {
270 if !self.may_have_downstream
272 && b.has_more_downstream_fragments(self.actor_context.id)
273 {
274 self.may_have_downstream = true;
275 }
276 Self::may_update_depended_subscriptions(
277 &mut self.depended_subscription_ids,
278 &b,
279 mv_table_id,
280 );
281 let op_consistency_level = get_op_consistency_level(
282 self.conflict_behavior,
283 self.may_have_downstream,
284 &self.depended_subscription_ids,
285 );
286 let post_commit = self
287 .state_table
288 .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
289 .await?;
290 if !post_commit.inner().is_consistent_op() {
291 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
292 }
293
294 let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
295 let b_epoch = b.epoch;
296 yield Message::Barrier(b);
297
298 if let Some((_, cache_may_stale)) =
300 post_commit.post_yield_barrier(update_vnode_bitmap).await?
301 {
302 if cache_may_stale {
303 self.materialize_cache.data.clear();
304 }
305 }
306
307 self.metrics
308 .materialize_current_epoch
309 .set(b_epoch.curr as i64);
310
311 continue;
312 }
313 }
314 }
315 }
316
317 fn may_update_depended_subscriptions(
319 depended_subscriptions: &mut HashSet<u32>,
320 barrier: &Barrier,
321 mv_table_id: TableId,
322 ) {
323 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
324 if !depended_subscriptions.insert(subscriber_id) {
325 warn!(
326 ?depended_subscriptions,
327 ?mv_table_id,
328 subscriber_id,
329 "subscription id already exists"
330 );
331 }
332 }
333
334 if let Some(Mutation::DropSubscriptions {
335 subscriptions_to_drop,
336 }) = barrier.mutation.as_deref()
337 {
338 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
339 if *upstream_mv_table_id == mv_table_id
340 && !depended_subscriptions.remove(subscriber_id)
341 {
342 warn!(
343 ?depended_subscriptions,
344 ?mv_table_id,
345 subscriber_id,
346 "drop non existing subscriber_id id"
347 );
348 }
349 }
350 }
351 }
352}
353
354impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
355 #[allow(clippy::too_many_arguments)]
357 pub async fn for_test(
358 input: Executor,
359 store: S,
360 table_id: TableId,
361 keys: Vec<ColumnOrder>,
362 column_ids: Vec<ColumnId>,
363 watermark_epoch: AtomicU64Ref,
364 conflict_behavior: ConflictBehavior,
365 ) -> Self {
366 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
367 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
368 let schema = input.schema().clone();
369 let columns: Vec<ColumnDesc> = column_ids
370 .into_iter()
371 .zip_eq_fast(schema.fields.iter())
372 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
373 .collect_vec();
374
375 let row_serde = BasicSerde::new(
376 Arc::from((0..columns.len()).collect_vec()),
377 Arc::from(columns.clone().into_boxed_slice()),
378 );
379 let state_table = StateTableInner::from_table_catalog(
380 &gen_pbtable(
381 table_id,
382 columns,
383 arrange_order_types,
384 arrange_columns.clone(),
385 0,
386 ),
387 store,
388 None,
389 )
390 .await;
391
392 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
393
394 Self {
395 input,
396 schema,
397 state_table,
398 arrange_key_indices: arrange_columns.clone(),
399 actor_context: ActorContext::for_test(0),
400 materialize_cache: MaterializeCache::new(
401 watermark_epoch,
402 MetricsInfo::for_test(),
403 row_serde,
404 None,
405 ),
406 conflict_behavior,
407 version_column_index: None,
408 may_have_downstream: true,
409 depended_subscription_ids: HashSet::new(),
410 metrics,
411 }
412 }
413}
414
415fn generate_output(
417 changes: MaterializeBuffer,
418 data_types: Vec<DataType>,
419) -> StreamExecutorResult<Option<StreamChunk>> {
420 let mut new_ops: Vec<Op> = vec![];
423 let mut new_rows: Vec<Bytes> = vec![];
424 let row_deserializer = RowDeserializer::new(data_types.clone());
425 for (_, row_op) in changes.into_parts() {
426 match row_op {
427 KeyOp::Insert(value) => {
428 new_ops.push(Op::Insert);
429 new_rows.push(value);
430 }
431 KeyOp::Delete(old_value) => {
432 new_ops.push(Op::Delete);
433 new_rows.push(old_value);
434 }
435 KeyOp::Update((old_value, new_value)) => {
436 if old_value != new_value {
438 new_ops.push(Op::UpdateDelete);
439 new_ops.push(Op::UpdateInsert);
440 new_rows.push(old_value);
441 new_rows.push(new_value);
442 }
443 }
444 }
445 }
446 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
447
448 for row_bytes in new_rows {
449 let res =
450 data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
451 debug_assert!(res.is_none());
452 }
453
454 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
455 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
456 Ok(Some(new_stream_chunk))
457 } else {
458 Ok(None)
459 }
460}
461
462pub struct MaterializeBuffer {
464 buffer: HashMap<Vec<u8>, KeyOp>,
465}
466
467impl MaterializeBuffer {
468 fn new() -> Self {
469 Self {
470 buffer: HashMap::new(),
471 }
472 }
473
474 pub fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
475 let entry = self.buffer.entry(pk);
476 match entry {
477 Entry::Vacant(e) => {
478 e.insert(KeyOp::Insert(value));
479 }
480 Entry::Occupied(mut e) => {
481 if let KeyOp::Delete(old_value) = e.get_mut() {
482 let old_val = std::mem::take(old_value);
483 e.insert(KeyOp::Update((old_val, value)));
484 } else {
485 unreachable!();
486 }
487 }
488 }
489 }
490
491 pub fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
492 let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
493 match entry {
494 Entry::Vacant(e) => {
495 e.insert(KeyOp::Delete(old_value));
496 }
497 Entry::Occupied(mut e) => match e.get_mut() {
498 KeyOp::Insert(_) => {
499 e.remove();
500 }
501 KeyOp::Update((prev, _curr)) => {
502 let prev = std::mem::take(prev);
503 e.insert(KeyOp::Delete(prev));
504 }
505 KeyOp::Delete(_) => {
506 unreachable!();
507 }
508 },
509 }
510 }
511
512 pub fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
513 let entry = self.buffer.entry(pk);
514 match entry {
515 Entry::Vacant(e) => {
516 e.insert(KeyOp::Update((old_value, new_value)));
517 }
518 Entry::Occupied(mut e) => match e.get_mut() {
519 KeyOp::Insert(_) => {
520 e.insert(KeyOp::Insert(new_value));
521 }
522 KeyOp::Update((_prev, curr)) => {
523 *curr = new_value;
524 }
525 KeyOp::Delete(_) => {
526 unreachable!()
527 }
528 },
529 }
530 }
531
532 pub fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
533 self.buffer
534 }
535}
536impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
537 fn execute(self: Box<Self>) -> BoxedMessageStream {
538 self.execute_inner().boxed()
539 }
540}
541
542impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 f.debug_struct("MaterializeExecutor")
545 .field("arrange_key_indices", &self.arrange_key_indices)
546 .finish()
547 }
548}
549
550pub struct MaterializeCache<SD> {
552 data: ManagedLruCache<Vec<u8>, CacheValue>,
553 row_serde: BasicSerde,
554 version_column_index: Option<u32>,
555 _serde: PhantomData<SD>,
556}
557
558type CacheValue = Option<CompactedRow>;
559
560impl<SD: ValueRowSerde> MaterializeCache<SD> {
561 pub fn new(
562 watermark_sequence: AtomicU64Ref,
563 metrics_info: MetricsInfo,
564 row_serde: BasicSerde,
565 version_column_index: Option<u32>,
566 ) -> Self {
567 let cache: ManagedLruCache<Vec<u8>, CacheValue> =
568 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
569 Self {
570 data: cache,
571 row_serde,
572 version_column_index,
573 _serde: PhantomData,
574 }
575 }
576
577 pub async fn handle<S: StateStore>(
578 &mut self,
579 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
580 table: &StateTableInner<S, SD>,
581 conflict_behavior: &ConflictBehavior,
582 metrics: &MaterializeMetrics,
583 ) -> StreamExecutorResult<MaterializeBuffer> {
584 let key_set: HashSet<Box<[u8]>> = row_ops
585 .iter()
586 .map(|(_, k, _)| k.as_slice().into())
587 .collect();
588 self.fetch_keys(
589 key_set.iter().map(|v| v.deref()),
590 table,
591 conflict_behavior,
592 metrics,
593 )
594 .await?;
595 let mut fixed_changes = MaterializeBuffer::new();
596 let row_serde = self.row_serde.clone();
597 let version_column_index = self.version_column_index;
598 for (op, key, value) in row_ops {
599 let mut update_cache = false;
600 let fixed_changes = &mut fixed_changes;
601 let fixed_changes = || {
603 update_cache = true;
604 fixed_changes
605 };
606 match op {
607 Op::Insert | Op::UpdateInsert => {
608 match conflict_behavior {
609 ConflictBehavior::Overwrite => {
610 match self.force_get(&key) {
611 Some(old_row) => {
612 let mut handle_conflict = true;
613 if let Some(idx) = version_column_index {
614 let old_row_deserialized = row_serde
615 .deserializer
616 .deserialize(old_row.row.clone())?;
617 let new_row_deserialized =
618 row_serde.deserializer.deserialize(value.clone())?;
619
620 handle_conflict = should_handle_conflict(
621 old_row_deserialized.index(idx as usize),
622 new_row_deserialized.index(idx as usize),
623 );
624 }
625 if handle_conflict {
626 fixed_changes().update(
627 key.clone(),
628 old_row.row.clone(),
629 value.clone(),
630 )
631 } else {
632 update_cache = false;
633 }
634 }
635 None => fixed_changes().insert(key.clone(), value.clone()),
636 };
637 }
638 ConflictBehavior::IgnoreConflict => {
639 match self.force_get(&key) {
640 Some(_) => (),
641 None => {
642 fixed_changes().insert(key.clone(), value.clone());
643 }
644 };
645 }
646
647 ConflictBehavior::DoUpdateIfNotNull => {
648 match self.force_get(&key) {
649 Some(old_row) => {
650 let mut old_row_deserialized_vec = row_serde
653 .deserializer
654 .deserialize(old_row.row.clone())?
655 .into_inner()
656 .into_vec();
657 let new_row_deserialized =
658 row_serde.deserializer.deserialize(value.clone())?;
659 let mut handle_conflict = true;
660 if let Some(idx) = version_column_index {
661 handle_conflict = should_handle_conflict(
662 &old_row_deserialized_vec[idx as usize],
663 new_row_deserialized.index(idx as usize),
664 );
665 }
666 if handle_conflict {
667 execute_do_update_if_not_null_replacement(
668 &mut old_row_deserialized_vec,
669 new_row_deserialized,
670 );
671 let new_row = OwnedRow::new(old_row_deserialized_vec);
672 let new_row_bytes =
673 Bytes::from(row_serde.serializer.serialize(new_row));
674 fixed_changes().update(
675 key.clone(),
676 old_row.row.clone(),
677 new_row_bytes.clone(),
678 );
679 self.data.push(
682 key.clone(),
683 Some(CompactedRow { row: new_row_bytes }),
684 );
685 }
686 update_cache = false;
687 }
688 None => {
689 fixed_changes().insert(key.clone(), value.clone());
690 update_cache = true;
691 }
692 };
693 }
694 _ => unreachable!(),
695 };
696
697 if update_cache {
698 match conflict_behavior {
699 ConflictBehavior::Overwrite
700 | ConflictBehavior::IgnoreConflict
701 | ConflictBehavior::DoUpdateIfNotNull => {
702 self.data.push(key, Some(CompactedRow { row: value }));
703 }
704
705 _ => unreachable!(),
706 }
707 }
708 }
709
710 Op::Delete | Op::UpdateDelete => {
711 match conflict_behavior {
712 ConflictBehavior::Overwrite
713 | ConflictBehavior::IgnoreConflict
714 | ConflictBehavior::DoUpdateIfNotNull => {
715 if let Some(old_row) = self.force_get(&key) {
717 fixed_changes().delete(key.clone(), old_row.row.clone());
718 };
719 }
720 _ => unreachable!(),
721 };
722
723 if update_cache {
724 self.data.push(key, None);
725 }
726 }
727 }
728 }
729 Ok(fixed_changes)
730 }
731
732 async fn fetch_keys<'a, S: StateStore>(
733 &mut self,
734 keys: impl Iterator<Item = &'a [u8]>,
735 table: &StateTableInner<S, SD>,
736 conflict_behavior: &ConflictBehavior,
737 metrics: &MaterializeMetrics,
738 ) -> StreamExecutorResult<()> {
739 let mut futures = vec![];
740 for key in keys {
741 metrics.materialize_cache_total_count.inc();
742
743 if self.data.contains(key) {
744 metrics.materialize_cache_hit_count.inc();
745 continue;
746 }
747 futures.push(async {
748 let key_row = table.pk_serde().deserialize(key).unwrap();
749 let row = table.get_row(key_row).await?.map(CompactedRow::from);
750 StreamExecutorResult::Ok((key.to_vec(), row))
751 });
752 }
753
754 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
755 while let Some(result) = buffered.next().await {
756 let (key, value) = result?;
757 match conflict_behavior {
758 ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull => {
759 self.data.push(key, value)
760 }
761 ConflictBehavior::IgnoreConflict => self.data.push(key, value),
762 _ => unreachable!(),
763 };
764 }
765
766 Ok(())
767 }
768
769 pub fn force_get(&mut self, key: &[u8]) -> &CacheValue {
770 self.data.get(key).unwrap_or_else(|| {
771 panic!(
772 "the key {:?} has not been fetched in the materialize executor's cache ",
773 key
774 )
775 })
776 }
777
778 fn evict(&mut self) {
779 self.data.evict()
780 }
781}
782
783fn execute_do_update_if_not_null_replacement(
800 old_row: &mut Vec<Option<ScalarImpl>>,
801 new_row: OwnedRow,
802) {
803 for (old_col, new_col) in old_row.iter_mut().zip_eq_fast(new_row) {
804 if let Some(new_value) = new_col {
805 *old_col = Some(new_value);
806 }
807 }
808}
809fn should_handle_conflict(
824 old_version_column: &Option<ScalarImpl>,
825 new_version_column: &Option<ScalarImpl>,
826) -> bool {
827 match (old_version_column, new_version_column) {
828 (None, None) => true,
829 (None, Some(_)) => true,
830 (Some(_), None) => false,
831 (Some(old_version_col), Some(new_version_col)) => !matches!(
832 old_version_col.default_cmp(new_version_col),
833 Ordering::Greater
834 ),
835 }
836}
837
838#[cfg(test)]
839mod tests {
840
841 use std::iter;
842 use std::sync::atomic::AtomicU64;
843
844 use rand::rngs::SmallRng;
845 use rand::{Rng, RngCore, SeedableRng};
846 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
847 use risingwave_common::catalog::Field;
848 use risingwave_common::util::epoch::test_epoch;
849 use risingwave_common::util::sort_util::OrderType;
850 use risingwave_hummock_sdk::HummockReadEpoch;
851 use risingwave_storage::memory::MemoryStateStore;
852 use risingwave_storage::table::batch_table::BatchTable;
853
854 use super::*;
855 use crate::executor::test_utils::*;
856
857 #[tokio::test]
858 async fn test_materialize_executor() {
859 let memory_state_store = MemoryStateStore::new();
861 let table_id = TableId::new(1);
862 let schema = Schema::new(vec![
864 Field::unnamed(DataType::Int32),
865 Field::unnamed(DataType::Int32),
866 ]);
867 let column_ids = vec![0.into(), 1.into()];
868
869 let chunk1 = StreamChunk::from_pretty(
871 " i i
872 + 1 4
873 + 2 5
874 + 3 6",
875 );
876 let chunk2 = StreamChunk::from_pretty(
877 " i i
878 + 7 8
879 - 3 6",
880 );
881
882 let source = MockSource::with_messages(vec![
884 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
885 Message::Chunk(chunk1),
886 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
887 Message::Chunk(chunk2),
888 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
889 ])
890 .into_executor(schema.clone(), PkIndices::new());
891
892 let order_types = vec![OrderType::ascending()];
893 let column_descs = vec![
894 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
895 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
896 ];
897
898 let table = BatchTable::for_test(
899 memory_state_store.clone(),
900 table_id,
901 column_descs,
902 order_types,
903 vec![0],
904 vec![0, 1],
905 );
906
907 let mut materialize_executor = MaterializeExecutor::for_test(
908 source,
909 memory_state_store,
910 table_id,
911 vec![ColumnOrder::new(0, OrderType::ascending())],
912 column_ids,
913 Arc::new(AtomicU64::new(0)),
914 ConflictBehavior::NoCheck,
915 )
916 .await
917 .boxed()
918 .execute();
919 materialize_executor.next().await.transpose().unwrap();
920
921 materialize_executor.next().await.transpose().unwrap();
922
923 match materialize_executor.next().await.transpose().unwrap() {
925 Some(Message::Barrier(_)) => {
926 let row = table
927 .get_row(
928 &OwnedRow::new(vec![Some(3_i32.into())]),
929 HummockReadEpoch::NoWait(u64::MAX),
930 )
931 .await
932 .unwrap();
933 assert_eq!(
934 row,
935 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
936 );
937 }
938 _ => unreachable!(),
939 }
940 materialize_executor.next().await.transpose().unwrap();
941 match materialize_executor.next().await.transpose().unwrap() {
943 Some(Message::Barrier(_)) => {
944 let row = table
945 .get_row(
946 &OwnedRow::new(vec![Some(7_i32.into())]),
947 HummockReadEpoch::NoWait(u64::MAX),
948 )
949 .await
950 .unwrap();
951 assert_eq!(
952 row,
953 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
954 );
955 }
956 _ => unreachable!(),
957 }
958 }
959
960 #[tokio::test]
962 async fn test_upsert_stream() {
963 let memory_state_store = MemoryStateStore::new();
965 let table_id = TableId::new(1);
966 let schema = Schema::new(vec![
968 Field::unnamed(DataType::Int32),
969 Field::unnamed(DataType::Int32),
970 ]);
971 let column_ids = vec![0.into(), 1.into()];
972
973 let chunk1 = StreamChunk::from_pretty(
975 " i i
976 + 1 1",
977 );
978
979 let chunk2 = StreamChunk::from_pretty(
980 " i i
981 + 1 2
982 - 1 2",
983 );
984
985 let source = MockSource::with_messages(vec![
987 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
988 Message::Chunk(chunk1),
989 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
990 Message::Chunk(chunk2),
991 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
992 ])
993 .into_executor(schema.clone(), PkIndices::new());
994
995 let order_types = vec![OrderType::ascending()];
996 let column_descs = vec![
997 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
998 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
999 ];
1000
1001 let table = BatchTable::for_test(
1002 memory_state_store.clone(),
1003 table_id,
1004 column_descs,
1005 order_types,
1006 vec![0],
1007 vec![0, 1],
1008 );
1009
1010 let mut materialize_executor = MaterializeExecutor::for_test(
1011 source,
1012 memory_state_store,
1013 table_id,
1014 vec![ColumnOrder::new(0, OrderType::ascending())],
1015 column_ids,
1016 Arc::new(AtomicU64::new(0)),
1017 ConflictBehavior::Overwrite,
1018 )
1019 .await
1020 .boxed()
1021 .execute();
1022 materialize_executor.next().await.transpose().unwrap();
1023
1024 materialize_executor.next().await.transpose().unwrap();
1025 materialize_executor.next().await.transpose().unwrap();
1026 materialize_executor.next().await.transpose().unwrap();
1027
1028 match materialize_executor.next().await.transpose().unwrap() {
1029 Some(Message::Barrier(_)) => {
1030 let row = table
1031 .get_row(
1032 &OwnedRow::new(vec![Some(1_i32.into())]),
1033 HummockReadEpoch::NoWait(u64::MAX),
1034 )
1035 .await
1036 .unwrap();
1037 assert!(row.is_none());
1038 }
1039 _ => unreachable!(),
1040 }
1041 }
1042
1043 #[tokio::test]
1044 async fn test_check_insert_conflict() {
1045 let memory_state_store = MemoryStateStore::new();
1047 let table_id = TableId::new(1);
1048 let schema = Schema::new(vec![
1050 Field::unnamed(DataType::Int32),
1051 Field::unnamed(DataType::Int32),
1052 ]);
1053 let column_ids = vec![0.into(), 1.into()];
1054
1055 let chunk1 = StreamChunk::from_pretty(
1057 " i i
1058 + 1 3
1059 + 1 4
1060 + 2 5
1061 + 3 6",
1062 );
1063
1064 let chunk2 = StreamChunk::from_pretty(
1065 " i i
1066 + 1 3
1067 + 2 6",
1068 );
1069
1070 let chunk3 = StreamChunk::from_pretty(
1072 " i i
1073 + 1 4",
1074 );
1075
1076 let source = MockSource::with_messages(vec![
1078 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1079 Message::Chunk(chunk1),
1080 Message::Chunk(chunk2),
1081 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1082 Message::Chunk(chunk3),
1083 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1084 ])
1085 .into_executor(schema.clone(), PkIndices::new());
1086
1087 let order_types = vec![OrderType::ascending()];
1088 let column_descs = vec![
1089 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1090 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1091 ];
1092
1093 let table = BatchTable::for_test(
1094 memory_state_store.clone(),
1095 table_id,
1096 column_descs,
1097 order_types,
1098 vec![0],
1099 vec![0, 1],
1100 );
1101
1102 let mut materialize_executor = MaterializeExecutor::for_test(
1103 source,
1104 memory_state_store,
1105 table_id,
1106 vec![ColumnOrder::new(0, OrderType::ascending())],
1107 column_ids,
1108 Arc::new(AtomicU64::new(0)),
1109 ConflictBehavior::Overwrite,
1110 )
1111 .await
1112 .boxed()
1113 .execute();
1114 materialize_executor.next().await.transpose().unwrap();
1115
1116 materialize_executor.next().await.transpose().unwrap();
1117 materialize_executor.next().await.transpose().unwrap();
1118
1119 match materialize_executor.next().await.transpose().unwrap() {
1121 Some(Message::Barrier(_)) => {
1122 let row = table
1123 .get_row(
1124 &OwnedRow::new(vec![Some(3_i32.into())]),
1125 HummockReadEpoch::NoWait(u64::MAX),
1126 )
1127 .await
1128 .unwrap();
1129 assert_eq!(
1130 row,
1131 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1132 );
1133
1134 let row = table
1135 .get_row(
1136 &OwnedRow::new(vec![Some(1_i32.into())]),
1137 HummockReadEpoch::NoWait(u64::MAX),
1138 )
1139 .await
1140 .unwrap();
1141 assert_eq!(
1142 row,
1143 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1144 );
1145
1146 let row = table
1147 .get_row(
1148 &OwnedRow::new(vec![Some(2_i32.into())]),
1149 HummockReadEpoch::NoWait(u64::MAX),
1150 )
1151 .await
1152 .unwrap();
1153 assert_eq!(
1154 row,
1155 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1156 );
1157 }
1158 _ => unreachable!(),
1159 }
1160 }
1161
1162 #[tokio::test]
1163 async fn test_delete_and_update_conflict() {
1164 let memory_state_store = MemoryStateStore::new();
1166 let table_id = TableId::new(1);
1167 let schema = Schema::new(vec![
1169 Field::unnamed(DataType::Int32),
1170 Field::unnamed(DataType::Int32),
1171 ]);
1172 let column_ids = vec![0.into(), 1.into()];
1173
1174 let chunk1 = StreamChunk::from_pretty(
1176 " i i
1177 + 1 4
1178 + 2 5
1179 + 3 6
1180 U- 8 1
1181 U+ 8 2
1182 + 8 3",
1183 );
1184
1185 let chunk2 = StreamChunk::from_pretty(
1187 " i i
1188 + 7 8
1189 - 3 4
1190 - 5 0",
1191 );
1192
1193 let chunk3 = StreamChunk::from_pretty(
1195 " i i
1196 + 1 5
1197 U- 2 4
1198 U+ 2 8
1199 U- 9 0
1200 U+ 9 1",
1201 );
1202
1203 let source = MockSource::with_messages(vec![
1205 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1206 Message::Chunk(chunk1),
1207 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1208 Message::Chunk(chunk2),
1209 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1210 Message::Chunk(chunk3),
1211 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1212 ])
1213 .into_executor(schema.clone(), PkIndices::new());
1214
1215 let order_types = vec![OrderType::ascending()];
1216 let column_descs = vec![
1217 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1218 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1219 ];
1220
1221 let table = BatchTable::for_test(
1222 memory_state_store.clone(),
1223 table_id,
1224 column_descs,
1225 order_types,
1226 vec![0],
1227 vec![0, 1],
1228 );
1229
1230 let mut materialize_executor = MaterializeExecutor::for_test(
1231 source,
1232 memory_state_store,
1233 table_id,
1234 vec![ColumnOrder::new(0, OrderType::ascending())],
1235 column_ids,
1236 Arc::new(AtomicU64::new(0)),
1237 ConflictBehavior::Overwrite,
1238 )
1239 .await
1240 .boxed()
1241 .execute();
1242 materialize_executor.next().await.transpose().unwrap();
1243
1244 materialize_executor.next().await.transpose().unwrap();
1245
1246 match materialize_executor.next().await.transpose().unwrap() {
1248 Some(Message::Barrier(_)) => {
1249 let row = table
1251 .get_row(
1252 &OwnedRow::new(vec![Some(8_i32.into())]),
1253 HummockReadEpoch::NoWait(u64::MAX),
1254 )
1255 .await
1256 .unwrap();
1257 assert_eq!(
1258 row,
1259 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1260 );
1261 }
1262 _ => unreachable!(),
1263 }
1264 materialize_executor.next().await.transpose().unwrap();
1265
1266 match materialize_executor.next().await.transpose().unwrap() {
1267 Some(Message::Barrier(_)) => {
1268 let row = table
1269 .get_row(
1270 &OwnedRow::new(vec![Some(7_i32.into())]),
1271 HummockReadEpoch::NoWait(u64::MAX),
1272 )
1273 .await
1274 .unwrap();
1275 assert_eq!(
1276 row,
1277 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1278 );
1279
1280 let row = table
1282 .get_row(
1283 &OwnedRow::new(vec![Some(3_i32.into())]),
1284 HummockReadEpoch::NoWait(u64::MAX),
1285 )
1286 .await
1287 .unwrap();
1288 assert_eq!(row, None);
1289
1290 let row = table
1292 .get_row(
1293 &OwnedRow::new(vec![Some(5_i32.into())]),
1294 HummockReadEpoch::NoWait(u64::MAX),
1295 )
1296 .await
1297 .unwrap();
1298 assert_eq!(row, None);
1299 }
1300 _ => unreachable!(),
1301 }
1302
1303 materialize_executor.next().await.transpose().unwrap();
1304 match materialize_executor.next().await.transpose().unwrap() {
1306 Some(Message::Barrier(_)) => {
1307 let row = table
1308 .get_row(
1309 &OwnedRow::new(vec![Some(1_i32.into())]),
1310 HummockReadEpoch::NoWait(u64::MAX),
1311 )
1312 .await
1313 .unwrap();
1314 assert_eq!(
1315 row,
1316 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1317 );
1318
1319 let row = table
1321 .get_row(
1322 &OwnedRow::new(vec![Some(2_i32.into())]),
1323 HummockReadEpoch::NoWait(u64::MAX),
1324 )
1325 .await
1326 .unwrap();
1327 assert_eq!(
1328 row,
1329 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1330 );
1331
1332 let row = table
1334 .get_row(
1335 &OwnedRow::new(vec![Some(9_i32.into())]),
1336 HummockReadEpoch::NoWait(u64::MAX),
1337 )
1338 .await
1339 .unwrap();
1340 assert_eq!(
1341 row,
1342 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1343 );
1344 }
1345 _ => unreachable!(),
1346 }
1347 }
1348
1349 #[tokio::test]
1350 async fn test_ignore_insert_conflict() {
1351 let memory_state_store = MemoryStateStore::new();
1353 let table_id = TableId::new(1);
1354 let schema = Schema::new(vec![
1356 Field::unnamed(DataType::Int32),
1357 Field::unnamed(DataType::Int32),
1358 ]);
1359 let column_ids = vec![0.into(), 1.into()];
1360
1361 let chunk1 = StreamChunk::from_pretty(
1363 " i i
1364 + 1 3
1365 + 1 4
1366 + 2 5
1367 + 3 6",
1368 );
1369
1370 let chunk2 = StreamChunk::from_pretty(
1371 " i i
1372 + 1 5
1373 + 2 6",
1374 );
1375
1376 let chunk3 = StreamChunk::from_pretty(
1378 " i i
1379 + 1 6",
1380 );
1381
1382 let source = MockSource::with_messages(vec![
1384 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1385 Message::Chunk(chunk1),
1386 Message::Chunk(chunk2),
1387 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1388 Message::Chunk(chunk3),
1389 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1390 ])
1391 .into_executor(schema.clone(), PkIndices::new());
1392
1393 let order_types = vec![OrderType::ascending()];
1394 let column_descs = vec![
1395 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1396 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1397 ];
1398
1399 let table = BatchTable::for_test(
1400 memory_state_store.clone(),
1401 table_id,
1402 column_descs,
1403 order_types,
1404 vec![0],
1405 vec![0, 1],
1406 );
1407
1408 let mut materialize_executor = MaterializeExecutor::for_test(
1409 source,
1410 memory_state_store,
1411 table_id,
1412 vec![ColumnOrder::new(0, OrderType::ascending())],
1413 column_ids,
1414 Arc::new(AtomicU64::new(0)),
1415 ConflictBehavior::IgnoreConflict,
1416 )
1417 .await
1418 .boxed()
1419 .execute();
1420 materialize_executor.next().await.transpose().unwrap();
1421
1422 materialize_executor.next().await.transpose().unwrap();
1423 materialize_executor.next().await.transpose().unwrap();
1424
1425 match materialize_executor.next().await.transpose().unwrap() {
1427 Some(Message::Barrier(_)) => {
1428 let row = table
1429 .get_row(
1430 &OwnedRow::new(vec![Some(3_i32.into())]),
1431 HummockReadEpoch::NoWait(u64::MAX),
1432 )
1433 .await
1434 .unwrap();
1435 assert_eq!(
1436 row,
1437 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1438 );
1439
1440 let row = table
1441 .get_row(
1442 &OwnedRow::new(vec![Some(1_i32.into())]),
1443 HummockReadEpoch::NoWait(u64::MAX),
1444 )
1445 .await
1446 .unwrap();
1447 assert_eq!(
1448 row,
1449 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1450 );
1451
1452 let row = table
1453 .get_row(
1454 &OwnedRow::new(vec![Some(2_i32.into())]),
1455 HummockReadEpoch::NoWait(u64::MAX),
1456 )
1457 .await
1458 .unwrap();
1459 assert_eq!(
1460 row,
1461 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1462 );
1463 }
1464 _ => unreachable!(),
1465 }
1466 }
1467
1468 #[tokio::test]
1469 async fn test_ignore_delete_then_insert() {
1470 let memory_state_store = MemoryStateStore::new();
1472 let table_id = TableId::new(1);
1473 let schema = Schema::new(vec![
1475 Field::unnamed(DataType::Int32),
1476 Field::unnamed(DataType::Int32),
1477 ]);
1478 let column_ids = vec![0.into(), 1.into()];
1479
1480 let chunk1 = StreamChunk::from_pretty(
1482 " i i
1483 + 1 3
1484 - 1 3
1485 + 1 6",
1486 );
1487
1488 let source = MockSource::with_messages(vec![
1490 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1491 Message::Chunk(chunk1),
1492 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1493 ])
1494 .into_executor(schema.clone(), PkIndices::new());
1495
1496 let order_types = vec![OrderType::ascending()];
1497 let column_descs = vec![
1498 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1499 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1500 ];
1501
1502 let table = BatchTable::for_test(
1503 memory_state_store.clone(),
1504 table_id,
1505 column_descs,
1506 order_types,
1507 vec![0],
1508 vec![0, 1],
1509 );
1510
1511 let mut materialize_executor = MaterializeExecutor::for_test(
1512 source,
1513 memory_state_store,
1514 table_id,
1515 vec![ColumnOrder::new(0, OrderType::ascending())],
1516 column_ids,
1517 Arc::new(AtomicU64::new(0)),
1518 ConflictBehavior::IgnoreConflict,
1519 )
1520 .await
1521 .boxed()
1522 .execute();
1523 let _msg1 = materialize_executor
1524 .next()
1525 .await
1526 .transpose()
1527 .unwrap()
1528 .unwrap()
1529 .as_barrier()
1530 .unwrap();
1531 let _msg2 = materialize_executor
1532 .next()
1533 .await
1534 .transpose()
1535 .unwrap()
1536 .unwrap()
1537 .as_chunk()
1538 .unwrap();
1539 let _msg3 = materialize_executor
1540 .next()
1541 .await
1542 .transpose()
1543 .unwrap()
1544 .unwrap()
1545 .as_barrier()
1546 .unwrap();
1547
1548 let row = table
1549 .get_row(
1550 &OwnedRow::new(vec![Some(1_i32.into())]),
1551 HummockReadEpoch::NoWait(u64::MAX),
1552 )
1553 .await
1554 .unwrap();
1555 assert_eq!(
1556 row,
1557 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1558 );
1559 }
1560
1561 #[tokio::test]
1562 async fn test_ignore_delete_and_update_conflict() {
1563 let memory_state_store = MemoryStateStore::new();
1565 let table_id = TableId::new(1);
1566 let schema = Schema::new(vec![
1568 Field::unnamed(DataType::Int32),
1569 Field::unnamed(DataType::Int32),
1570 ]);
1571 let column_ids = vec![0.into(), 1.into()];
1572
1573 let chunk1 = StreamChunk::from_pretty(
1575 " i i
1576 + 1 4
1577 + 2 5
1578 + 3 6
1579 U- 8 1
1580 U+ 8 2
1581 + 8 3",
1582 );
1583
1584 let chunk2 = StreamChunk::from_pretty(
1586 " i i
1587 + 7 8
1588 - 3 4
1589 - 5 0",
1590 );
1591
1592 let chunk3 = StreamChunk::from_pretty(
1594 " i i
1595 + 1 5
1596 U- 2 4
1597 U+ 2 8
1598 U- 9 0
1599 U+ 9 1",
1600 );
1601
1602 let source = MockSource::with_messages(vec![
1604 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1605 Message::Chunk(chunk1),
1606 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1607 Message::Chunk(chunk2),
1608 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1609 Message::Chunk(chunk3),
1610 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1611 ])
1612 .into_executor(schema.clone(), PkIndices::new());
1613
1614 let order_types = vec![OrderType::ascending()];
1615 let column_descs = vec![
1616 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1617 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1618 ];
1619
1620 let table = BatchTable::for_test(
1621 memory_state_store.clone(),
1622 table_id,
1623 column_descs,
1624 order_types,
1625 vec![0],
1626 vec![0, 1],
1627 );
1628
1629 let mut materialize_executor = MaterializeExecutor::for_test(
1630 source,
1631 memory_state_store,
1632 table_id,
1633 vec![ColumnOrder::new(0, OrderType::ascending())],
1634 column_ids,
1635 Arc::new(AtomicU64::new(0)),
1636 ConflictBehavior::IgnoreConflict,
1637 )
1638 .await
1639 .boxed()
1640 .execute();
1641 materialize_executor.next().await.transpose().unwrap();
1642
1643 materialize_executor.next().await.transpose().unwrap();
1644
1645 match materialize_executor.next().await.transpose().unwrap() {
1647 Some(Message::Barrier(_)) => {
1648 let row = table
1650 .get_row(
1651 &OwnedRow::new(vec![Some(8_i32.into())]),
1652 HummockReadEpoch::NoWait(u64::MAX),
1653 )
1654 .await
1655 .unwrap();
1656 assert_eq!(
1657 row,
1658 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1659 );
1660 }
1661 _ => unreachable!(),
1662 }
1663 materialize_executor.next().await.transpose().unwrap();
1664
1665 match materialize_executor.next().await.transpose().unwrap() {
1666 Some(Message::Barrier(_)) => {
1667 let row = table
1668 .get_row(
1669 &OwnedRow::new(vec![Some(7_i32.into())]),
1670 HummockReadEpoch::NoWait(u64::MAX),
1671 )
1672 .await
1673 .unwrap();
1674 assert_eq!(
1675 row,
1676 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1677 );
1678
1679 let row = table
1681 .get_row(
1682 &OwnedRow::new(vec![Some(3_i32.into())]),
1683 HummockReadEpoch::NoWait(u64::MAX),
1684 )
1685 .await
1686 .unwrap();
1687 assert_eq!(row, None);
1688
1689 let row = table
1691 .get_row(
1692 &OwnedRow::new(vec![Some(5_i32.into())]),
1693 HummockReadEpoch::NoWait(u64::MAX),
1694 )
1695 .await
1696 .unwrap();
1697 assert_eq!(row, None);
1698 }
1699 _ => unreachable!(),
1700 }
1701
1702 materialize_executor.next().await.transpose().unwrap();
1703 match materialize_executor.next().await.transpose().unwrap() {
1706 Some(Message::Barrier(_)) => {
1707 let row = table
1708 .get_row(
1709 &OwnedRow::new(vec![Some(1_i32.into())]),
1710 HummockReadEpoch::NoWait(u64::MAX),
1711 )
1712 .await
1713 .unwrap();
1714 assert_eq!(
1715 row,
1716 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1717 );
1718
1719 let row = table
1721 .get_row(
1722 &OwnedRow::new(vec![Some(2_i32.into())]),
1723 HummockReadEpoch::NoWait(u64::MAX),
1724 )
1725 .await
1726 .unwrap();
1727 assert_eq!(
1728 row,
1729 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1730 );
1731
1732 let row = table
1734 .get_row(
1735 &OwnedRow::new(vec![Some(9_i32.into())]),
1736 HummockReadEpoch::NoWait(u64::MAX),
1737 )
1738 .await
1739 .unwrap();
1740 assert_eq!(
1741 row,
1742 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1743 );
1744 }
1745 _ => unreachable!(),
1746 }
1747 }
1748
1749 #[tokio::test]
1750 async fn test_do_update_if_not_null_conflict() {
1751 let memory_state_store = MemoryStateStore::new();
1753 let table_id = TableId::new(1);
1754 let schema = Schema::new(vec![
1756 Field::unnamed(DataType::Int32),
1757 Field::unnamed(DataType::Int32),
1758 ]);
1759 let column_ids = vec![0.into(), 1.into()];
1760
1761 let chunk1 = StreamChunk::from_pretty(
1763 " i i
1764 + 1 4
1765 + 2 .
1766 + 3 6
1767 U- 8 .
1768 U+ 8 2
1769 + 8 .",
1770 );
1771
1772 let chunk2 = StreamChunk::from_pretty(
1774 " i i
1775 + 7 8
1776 - 3 4
1777 - 5 0",
1778 );
1779
1780 let chunk3 = StreamChunk::from_pretty(
1782 " i i
1783 + 1 5
1784 + 7 .
1785 U- 2 4
1786 U+ 2 .
1787 U- 9 0
1788 U+ 9 1",
1789 );
1790
1791 let source = MockSource::with_messages(vec![
1793 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1794 Message::Chunk(chunk1),
1795 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1796 Message::Chunk(chunk2),
1797 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1798 Message::Chunk(chunk3),
1799 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1800 ])
1801 .into_executor(schema.clone(), PkIndices::new());
1802
1803 let order_types = vec![OrderType::ascending()];
1804 let column_descs = vec![
1805 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1806 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1807 ];
1808
1809 let table = BatchTable::for_test(
1810 memory_state_store.clone(),
1811 table_id,
1812 column_descs,
1813 order_types,
1814 vec![0],
1815 vec![0, 1],
1816 );
1817
1818 let mut materialize_executor = MaterializeExecutor::for_test(
1819 source,
1820 memory_state_store,
1821 table_id,
1822 vec![ColumnOrder::new(0, OrderType::ascending())],
1823 column_ids,
1824 Arc::new(AtomicU64::new(0)),
1825 ConflictBehavior::DoUpdateIfNotNull,
1826 )
1827 .await
1828 .boxed()
1829 .execute();
1830 materialize_executor.next().await.transpose().unwrap();
1831
1832 materialize_executor.next().await.transpose().unwrap();
1833
1834 match materialize_executor.next().await.transpose().unwrap() {
1836 Some(Message::Barrier(_)) => {
1837 let row = table
1838 .get_row(
1839 &OwnedRow::new(vec![Some(8_i32.into())]),
1840 HummockReadEpoch::NoWait(u64::MAX),
1841 )
1842 .await
1843 .unwrap();
1844 assert_eq!(
1845 row,
1846 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1847 );
1848
1849 let row = table
1850 .get_row(
1851 &OwnedRow::new(vec![Some(2_i32.into())]),
1852 HummockReadEpoch::NoWait(u64::MAX),
1853 )
1854 .await
1855 .unwrap();
1856 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1857 }
1858 _ => unreachable!(),
1859 }
1860 materialize_executor.next().await.transpose().unwrap();
1861
1862 match materialize_executor.next().await.transpose().unwrap() {
1863 Some(Message::Barrier(_)) => {
1864 let row = table
1865 .get_row(
1866 &OwnedRow::new(vec![Some(7_i32.into())]),
1867 HummockReadEpoch::NoWait(u64::MAX),
1868 )
1869 .await
1870 .unwrap();
1871 assert_eq!(
1872 row,
1873 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1874 );
1875
1876 let row = table
1878 .get_row(
1879 &OwnedRow::new(vec![Some(3_i32.into())]),
1880 HummockReadEpoch::NoWait(u64::MAX),
1881 )
1882 .await
1883 .unwrap();
1884 assert_eq!(row, None);
1885
1886 let row = table
1888 .get_row(
1889 &OwnedRow::new(vec![Some(5_i32.into())]),
1890 HummockReadEpoch::NoWait(u64::MAX),
1891 )
1892 .await
1893 .unwrap();
1894 assert_eq!(row, None);
1895 }
1896 _ => unreachable!(),
1897 }
1898
1899 materialize_executor.next().await.transpose().unwrap();
1900 match materialize_executor.next().await.transpose().unwrap() {
1903 Some(Message::Barrier(_)) => {
1904 let row = table
1905 .get_row(
1906 &OwnedRow::new(vec![Some(7_i32.into())]),
1907 HummockReadEpoch::NoWait(u64::MAX),
1908 )
1909 .await
1910 .unwrap();
1911 assert_eq!(
1912 row,
1913 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1914 );
1915
1916 let row = table
1918 .get_row(
1919 &OwnedRow::new(vec![Some(2_i32.into())]),
1920 HummockReadEpoch::NoWait(u64::MAX),
1921 )
1922 .await
1923 .unwrap();
1924 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1925
1926 let row = table
1928 .get_row(
1929 &OwnedRow::new(vec![Some(9_i32.into())]),
1930 HummockReadEpoch::NoWait(u64::MAX),
1931 )
1932 .await
1933 .unwrap();
1934 assert_eq!(
1935 row,
1936 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1937 );
1938 }
1939 _ => unreachable!(),
1940 }
1941 }
1942
1943 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1944 const KN: u32 = 4;
1945 const SEED: u64 = 998244353;
1946 let mut ret = vec![];
1947 let mut builder =
1948 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1949 let mut rng = SmallRng::seed_from_u64(SEED);
1950
1951 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1952 let len = c.data_chunk().capacity();
1953 let mut c = StreamChunkMut::from(c);
1954 for i in 0..len {
1955 c.set_vis(i, rng.random_bool(0.5));
1956 }
1957 c.into()
1958 };
1959 for _ in 0..row_number {
1960 let k = (rng.next_u32() % KN) as i32;
1961 let v = rng.next_u32() as i32;
1962 let op = if rng.random_bool(0.5) {
1963 Op::Insert
1964 } else {
1965 Op::Delete
1966 };
1967 if let Some(c) =
1968 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1969 {
1970 ret.push(random_vis(c, &mut rng));
1971 }
1972 }
1973 if let Some(c) = builder.take() {
1974 ret.push(random_vis(c, &mut rng));
1975 }
1976 ret
1977 }
1978
1979 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1980 const N: usize = 100000;
1981
1982 let memory_state_store = MemoryStateStore::new();
1984 let table_id = TableId::new(1);
1985 let schema = Schema::new(vec![
1987 Field::unnamed(DataType::Int32),
1988 Field::unnamed(DataType::Int32),
1989 ]);
1990 let column_ids = vec![0.into(), 1.into()];
1991
1992 let chunks = gen_fuzz_data(N, 128);
1993 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1994 .chain(chunks.into_iter().map(Message::Chunk))
1995 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1996 test_epoch(2),
1997 ))))
1998 .collect();
1999 let source =
2001 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2002
2003 let mut materialize_executor = MaterializeExecutor::for_test(
2004 source,
2005 memory_state_store.clone(),
2006 table_id,
2007 vec![ColumnOrder::new(0, OrderType::ascending())],
2008 column_ids,
2009 Arc::new(AtomicU64::new(0)),
2010 conflict_behavior,
2011 )
2012 .await
2013 .boxed()
2014 .execute();
2015 materialize_executor.expect_barrier().await;
2016
2017 let order_types = vec![OrderType::ascending()];
2018 let column_descs = vec![
2019 ColumnDesc::unnamed(0.into(), DataType::Int32),
2020 ColumnDesc::unnamed(1.into(), DataType::Int32),
2021 ];
2022 let pk_indices = vec![0];
2023
2024 let mut table = StateTable::from_table_catalog(
2025 &gen_pbtable(
2026 TableId::from(1002),
2027 column_descs.clone(),
2028 order_types,
2029 pk_indices,
2030 0,
2031 ),
2032 memory_state_store.clone(),
2033 None,
2034 )
2035 .await;
2036
2037 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2038 table.write_chunk(c);
2040 }
2041 }
2042
2043 #[tokio::test]
2044 async fn fuzz_test_stream_consistent_upsert() {
2045 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2046 }
2047
2048 #[tokio::test]
2049 async fn fuzz_test_stream_consistent_ignore() {
2050 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2051 }
2052}