1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27 ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48 input: Executor,
49
50 schema: Schema,
51
52 state_table: StateTableInner<S, SD>,
53
54 arrange_key_indices: Vec<usize>,
56
57 actor_context: ActorContextRef,
58
59 materialize_cache: MaterializeCache<SD>,
60
61 conflict_behavior: ConflictBehavior,
62
63 version_column_index: Option<u32>,
64
65 may_have_downstream: bool,
66
67 depended_subscription_ids: HashSet<u32>,
68
69 metrics: MaterializeMetrics,
70
71 is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77 conflict_behavior: ConflictBehavior,
78 may_have_downstream: bool,
79 depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81 if !depended_subscriptions.is_empty() {
82 StateTableOpConsistencyLevel::LogStoreEnabled
83 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84 StateTableOpConsistencyLevel::Inconsistent
87 } else {
88 StateTableOpConsistencyLevel::ConsistentOldValue
89 }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93 #[allow(clippy::too_many_arguments)]
97 pub async fn new(
98 input: Executor,
99 schema: Schema,
100 store: S,
101 arrange_key: Vec<ColumnOrder>,
102 actor_context: ActorContextRef,
103 vnodes: Option<Arc<Bitmap>>,
104 table_catalog: &Table,
105 watermark_epoch: AtomicU64Ref,
106 conflict_behavior: ConflictBehavior,
107 version_column_index: Option<u32>,
108 metrics: Arc<StreamingMetrics>,
109 ) -> Self {
110 let table_columns: Vec<ColumnDesc> = table_catalog
111 .columns
112 .iter()
113 .map(|col| col.column_desc.as_ref().unwrap().into())
114 .collect();
115
116 let row_serde: BasicSerde = BasicSerde::new(
117 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118 Arc::from(table_columns.into_boxed_slice()),
119 );
120
121 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122 let may_have_downstream = actor_context.initial_dispatch_num != 0;
123 let depended_subscription_ids = actor_context
124 .related_subscriptions
125 .get(&TableId::new(table_catalog.id))
126 .cloned()
127 .unwrap_or_default();
128 let op_consistency_level = get_op_consistency_level(
129 conflict_behavior,
130 may_have_downstream,
131 &depended_subscription_ids,
132 );
133 let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135 table_catalog,
136 store,
137 vnodes,
138 op_consistency_level,
139 )
140 .await;
141
142 let mv_metrics = metrics.new_materialize_metrics(
143 TableId::new(table_catalog.id),
144 actor_context.id,
145 actor_context.fragment_id,
146 );
147
148 let metrics_info =
149 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151 let is_dummy_table =
152 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154 Self {
155 input,
156 schema,
157 state_table,
158 arrange_key_indices,
159 actor_context,
160 materialize_cache: MaterializeCache::new(
161 watermark_epoch,
162 metrics_info,
163 row_serde,
164 version_column_index,
165 ),
166 conflict_behavior,
167 version_column_index,
168 is_dummy_table,
169 may_have_downstream,
170 depended_subscription_ids,
171 metrics: mv_metrics,
172 }
173 }
174
175 #[try_stream(ok = Message, error = StreamExecutorError)]
176 async fn execute_inner(mut self) {
177 let mv_table_id = TableId::new(self.state_table.table_id());
178
179 let data_types = self.schema.data_types();
180 let mut input = self.input.execute();
181
182 let barrier = expect_first_barrier(&mut input).await?;
183 let first_epoch = barrier.epoch;
184 yield Message::Barrier(barrier);
186 self.state_table.init_epoch(first_epoch).await?;
187
188 #[for_await]
189 for msg in input {
190 let msg = msg?;
191 self.materialize_cache.evict();
192
193 let msg = match msg {
194 Message::Watermark(w) => Message::Watermark(w),
195 Message::Chunk(chunk) if self.is_dummy_table => {
196 self.metrics
197 .materialize_input_row_count
198 .inc_by(chunk.cardinality() as u64);
199 Message::Chunk(chunk)
200 }
201 Message::Chunk(chunk) => {
202 self.metrics
203 .materialize_input_row_count
204 .inc_by(chunk.cardinality() as u64);
205
206 let do_not_handle_conflict = !self.state_table.is_consistent_op()
210 && self.version_column_index.is_none()
211 && self.conflict_behavior == ConflictBehavior::Overwrite;
212 match self.conflict_behavior {
213 checked_conflict_behaviors!() if !do_not_handle_conflict => {
214 if chunk.cardinality() == 0 {
215 continue;
217 }
218 let (data_chunk, ops) = chunk.into_parts();
219
220 if self.state_table.value_indices().is_some() {
221 panic!(
224 "materialize executor with data check can not handle only materialize partial columns"
225 )
226 };
227 let values = data_chunk.serialize();
228
229 let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231 let pks = {
232 let mut pks = vec![vec![]; data_chunk.capacity()];
233 key_chunk
234 .rows_with_holes()
235 .zip_eq_fast(pks.iter_mut())
236 .for_each(|(r, vnode_and_pk)| {
237 if let Some(r) = r {
238 self.state_table.pk_serde().serialize(r, vnode_and_pk);
239 }
240 });
241 pks
242 };
243 let (_, vis) = key_chunk.into_parts();
244 let row_ops = ops
245 .iter()
246 .zip_eq_debug(pks.into_iter())
247 .zip_eq_debug(values.into_iter())
248 .zip_eq_debug(vis.iter())
249 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250 .collect_vec();
251
252 let change_buffer = self
253 .materialize_cache
254 .handle(
255 row_ops,
256 &self.state_table,
257 self.conflict_behavior,
258 &self.metrics,
259 )
260 .await?;
261
262 match generate_output(change_buffer, data_types.clone())? {
263 Some(output_chunk) => {
264 self.state_table.write_chunk(output_chunk.clone());
265 self.state_table.try_flush().await?;
266 Message::Chunk(output_chunk)
267 }
268 None => continue,
269 }
270 }
271 ConflictBehavior::IgnoreConflict => unreachable!(),
272 ConflictBehavior::NoCheck
273 | ConflictBehavior::Overwrite
274 | ConflictBehavior::DoUpdateIfNotNull => {
275 self.state_table.write_chunk(chunk.clone());
276 self.state_table.try_flush().await?;
277 Message::Chunk(chunk)
278 } }
280 }
281 Message::Barrier(b) => {
282 if !self.may_have_downstream
284 && b.has_more_downstream_fragments(self.actor_context.id)
285 {
286 self.may_have_downstream = true;
287 }
288 Self::may_update_depended_subscriptions(
289 &mut self.depended_subscription_ids,
290 &b,
291 mv_table_id,
292 );
293 let op_consistency_level = get_op_consistency_level(
294 self.conflict_behavior,
295 self.may_have_downstream,
296 &self.depended_subscription_ids,
297 );
298 let post_commit = self
299 .state_table
300 .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301 .await?;
302 if !post_commit.inner().is_consistent_op() {
303 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304 }
305
306 let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307 let b_epoch = b.epoch;
308 yield Message::Barrier(b);
309
310 if let Some((_, cache_may_stale)) =
312 post_commit.post_yield_barrier(update_vnode_bitmap).await?
313 && cache_may_stale
314 {
315 self.materialize_cache.lru_cache.clear();
316 }
317
318 self.metrics
319 .materialize_current_epoch
320 .set(b_epoch.curr as i64);
321
322 continue;
323 }
324 };
325 yield msg;
326 }
327 }
328
329 fn may_update_depended_subscriptions(
331 depended_subscriptions: &mut HashSet<u32>,
332 barrier: &Barrier,
333 mv_table_id: TableId,
334 ) {
335 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
336 if !depended_subscriptions.insert(subscriber_id) {
337 warn!(
338 ?depended_subscriptions,
339 ?mv_table_id,
340 subscriber_id,
341 "subscription id already exists"
342 );
343 }
344 }
345
346 if let Some(Mutation::DropSubscriptions {
347 subscriptions_to_drop,
348 }) = barrier.mutation.as_deref()
349 {
350 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
351 if *upstream_mv_table_id == mv_table_id
352 && !depended_subscriptions.remove(subscriber_id)
353 {
354 warn!(
355 ?depended_subscriptions,
356 ?mv_table_id,
357 subscriber_id,
358 "drop non existing subscriber_id id"
359 );
360 }
361 }
362 }
363 }
364}
365
366impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
367 #[allow(clippy::too_many_arguments)]
369 pub async fn for_test(
370 input: Executor,
371 store: S,
372 table_id: TableId,
373 keys: Vec<ColumnOrder>,
374 column_ids: Vec<ColumnId>,
375 watermark_epoch: AtomicU64Ref,
376 conflict_behavior: ConflictBehavior,
377 ) -> Self {
378 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
379 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
380 let schema = input.schema().clone();
381 let columns: Vec<ColumnDesc> = column_ids
382 .into_iter()
383 .zip_eq_fast(schema.fields.iter())
384 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
385 .collect_vec();
386
387 let row_serde = BasicSerde::new(
388 Arc::from((0..columns.len()).collect_vec()),
389 Arc::from(columns.clone().into_boxed_slice()),
390 );
391 let state_table = StateTableInner::from_table_catalog(
392 &gen_pbtable(
393 table_id,
394 columns,
395 arrange_order_types,
396 arrange_columns.clone(),
397 0,
398 ),
399 store,
400 None,
401 )
402 .await;
403
404 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
405
406 Self {
407 input,
408 schema,
409 state_table,
410 arrange_key_indices: arrange_columns.clone(),
411 actor_context: ActorContext::for_test(0),
412 materialize_cache: MaterializeCache::new(
413 watermark_epoch,
414 MetricsInfo::for_test(),
415 row_serde,
416 None,
417 ),
418 conflict_behavior,
419 version_column_index: None,
420 is_dummy_table: false,
421 may_have_downstream: true,
422 depended_subscription_ids: HashSet::new(),
423 metrics,
424 }
425 }
426}
427
428fn generate_output(
430 change_buffer: ChangeBuffer,
431 data_types: Vec<DataType>,
432) -> StreamExecutorResult<Option<StreamChunk>> {
433 let mut new_ops: Vec<Op> = vec![];
436 let mut new_rows: Vec<Bytes> = vec![];
437 let row_deserializer = RowDeserializer::new(data_types.clone());
438 for (_, row_op) in change_buffer.into_parts() {
439 match row_op {
440 KeyOp::Insert(value) => {
441 new_ops.push(Op::Insert);
442 new_rows.push(value);
443 }
444 KeyOp::Delete(old_value) => {
445 new_ops.push(Op::Delete);
446 new_rows.push(old_value);
447 }
448 KeyOp::Update((old_value, new_value)) => {
449 if old_value != new_value {
451 new_ops.push(Op::UpdateDelete);
452 new_ops.push(Op::UpdateInsert);
453 new_rows.push(old_value);
454 new_rows.push(new_value);
455 }
456 }
457 }
458 }
459 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
460
461 for row_bytes in new_rows {
462 let res =
463 data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
464 debug_assert!(res.is_none());
465 }
466
467 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
468 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
469 Ok(Some(new_stream_chunk))
470 } else {
471 Ok(None)
472 }
473}
474
475struct ChangeBuffer {
478 buffer: HashMap<Vec<u8>, KeyOp>,
479}
480
481impl ChangeBuffer {
482 fn new() -> Self {
483 Self {
484 buffer: HashMap::new(),
485 }
486 }
487
488 fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
489 let entry = self.buffer.entry(pk);
490 match entry {
491 Entry::Vacant(e) => {
492 e.insert(KeyOp::Insert(value));
493 }
494 Entry::Occupied(mut e) => {
495 if let KeyOp::Delete(old_value) = e.get_mut() {
496 let old_val = std::mem::take(old_value);
497 e.insert(KeyOp::Update((old_val, value)));
498 } else {
499 unreachable!();
500 }
501 }
502 }
503 }
504
505 fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
506 let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
507 match entry {
508 Entry::Vacant(e) => {
509 e.insert(KeyOp::Delete(old_value));
510 }
511 Entry::Occupied(mut e) => match e.get_mut() {
512 KeyOp::Insert(_) => {
513 e.remove();
514 }
515 KeyOp::Update((prev, _curr)) => {
516 let prev = std::mem::take(prev);
517 e.insert(KeyOp::Delete(prev));
518 }
519 KeyOp::Delete(_) => {
520 unreachable!();
521 }
522 },
523 }
524 }
525
526 fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
527 let entry = self.buffer.entry(pk);
528 match entry {
529 Entry::Vacant(e) => {
530 e.insert(KeyOp::Update((old_value, new_value)));
531 }
532 Entry::Occupied(mut e) => match e.get_mut() {
533 KeyOp::Insert(_) => {
534 e.insert(KeyOp::Insert(new_value));
535 }
536 KeyOp::Update((_prev, curr)) => {
537 *curr = new_value;
538 }
539 KeyOp::Delete(_) => {
540 unreachable!()
541 }
542 },
543 }
544 }
545
546 fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
547 self.buffer
548 }
549}
550impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
551 fn execute(self: Box<Self>) -> BoxedMessageStream {
552 self.execute_inner().boxed()
553 }
554}
555
556impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558 f.debug_struct("MaterializeExecutor")
559 .field("arrange_key_indices", &self.arrange_key_indices)
560 .finish()
561 }
562}
563
564struct MaterializeCache<SD> {
566 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
567 row_serde: BasicSerde,
568 version_column_index: Option<u32>,
569 _serde: PhantomData<SD>,
570}
571
572type CacheValue = Option<CompactedRow>;
573
574impl<SD: ValueRowSerde> MaterializeCache<SD> {
575 fn new(
576 watermark_sequence: AtomicU64Ref,
577 metrics_info: MetricsInfo,
578 row_serde: BasicSerde,
579 version_column_index: Option<u32>,
580 ) -> Self {
581 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
582 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
583 Self {
584 lru_cache,
585 row_serde,
586 version_column_index,
587 _serde: PhantomData,
588 }
589 }
590
591 async fn handle<S: StateStore>(
592 &mut self,
593 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
594 table: &StateTableInner<S, SD>,
595 conflict_behavior: ConflictBehavior,
596 metrics: &MaterializeMetrics,
597 ) -> StreamExecutorResult<ChangeBuffer> {
598 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
599
600 let key_set: HashSet<Box<[u8]>> = row_ops
601 .iter()
602 .map(|(_, k, _)| k.as_slice().into())
603 .collect();
604
605 self.fetch_keys(
608 key_set.iter().map(|v| v.deref()),
609 table,
610 conflict_behavior,
611 metrics,
612 )
613 .await?;
614
615 let mut change_buffer = ChangeBuffer::new();
616 let row_serde = self.row_serde.clone();
617 let version_column_index = self.version_column_index;
618 for (op, key, row) in row_ops {
619 match op {
620 Op::Insert | Op::UpdateInsert => {
621 let Some(old_row) = self.get_expected(&key) else {
622 change_buffer.insert(key.clone(), row.clone());
624 self.lru_cache.put(key, Some(CompactedRow { row }));
625 continue;
626 };
627
628 match conflict_behavior {
630 ConflictBehavior::Overwrite => {
631 let need_overwrite = if let Some(idx) = version_column_index {
632 let old_row_deserialized =
633 row_serde.deserializer.deserialize(old_row.row.clone())?;
634 let new_row_deserialized =
635 row_serde.deserializer.deserialize(row.clone())?;
636
637 version_is_newer_or_equal(
638 old_row_deserialized.index(idx as usize),
639 new_row_deserialized.index(idx as usize),
640 )
641 } else {
642 true
644 };
645 if need_overwrite {
646 change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
647 self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
648 };
649 }
650 ConflictBehavior::IgnoreConflict => {
651 }
653 ConflictBehavior::DoUpdateIfNotNull => {
654 let old_row_deserialized =
658 row_serde.deserializer.deserialize(old_row.row.clone())?;
659 let new_row_deserialized =
660 row_serde.deserializer.deserialize(row.clone())?;
661 let need_overwrite = if let Some(idx) = version_column_index {
662 version_is_newer_or_equal(
663 old_row_deserialized.index(idx as usize),
664 new_row_deserialized.index(idx as usize),
665 )
666 } else {
667 true
668 };
669
670 if need_overwrite {
671 let mut row_deserialized_vec =
672 old_row_deserialized.into_inner().into_vec();
673 replace_if_not_null(
674 &mut row_deserialized_vec,
675 new_row_deserialized,
676 );
677 let updated_row = OwnedRow::new(row_deserialized_vec);
678 let updated_row_bytes =
679 Bytes::from(row_serde.serializer.serialize(updated_row));
680
681 change_buffer.update(
682 key.clone(),
683 old_row.row.clone(),
684 updated_row_bytes.clone(),
685 );
686 self.lru_cache.put(
687 key.clone(),
688 Some(CompactedRow {
689 row: updated_row_bytes,
690 }),
691 );
692 }
693 }
694 _ => unreachable!(),
695 };
696 }
697
698 Op::Delete | Op::UpdateDelete => {
699 match conflict_behavior {
700 checked_conflict_behaviors!() => {
701 if let Some(old_row) = self.get_expected(&key) {
702 change_buffer.delete(key.clone(), old_row.row.clone());
703 self.lru_cache.put(key, None);
705 } else {
706 };
709 }
710 _ => unreachable!(),
711 };
712 }
713 }
714 }
715 Ok(change_buffer)
716 }
717
718 async fn fetch_keys<'a, S: StateStore>(
719 &mut self,
720 keys: impl Iterator<Item = &'a [u8]>,
721 table: &StateTableInner<S, SD>,
722 conflict_behavior: ConflictBehavior,
723 metrics: &MaterializeMetrics,
724 ) -> StreamExecutorResult<()> {
725 let mut futures = vec![];
726 for key in keys {
727 metrics.materialize_cache_total_count.inc();
728
729 if self.lru_cache.contains(key) {
730 metrics.materialize_cache_hit_count.inc();
731 continue;
732 }
733 futures.push(async {
734 let key_row = table.pk_serde().deserialize(key).unwrap();
735 let row = table.get_row(key_row).await?.map(CompactedRow::from);
736 StreamExecutorResult::Ok((key.to_vec(), row))
737 });
738 }
739
740 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
741 while let Some(result) = buffered.next().await {
742 let (key, row) = result?;
743 match conflict_behavior {
745 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
746 _ => unreachable!(),
747 };
748 }
749
750 Ok(())
751 }
752
753 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
754 self.lru_cache.get(key).unwrap_or_else(|| {
755 panic!(
756 "the key {:?} has not been fetched in the materialize executor's cache ",
757 key
758 )
759 })
760 }
761
762 fn evict(&mut self) {
763 self.lru_cache.evict()
764 }
765}
766
767fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
780 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
781 if let Some(new_value) = new_col {
782 *old_col = Some(new_value);
783 }
784 }
785}
786
787fn version_is_newer_or_equal(
790 old_version: &Option<ScalarImpl>,
791 new_version: &Option<ScalarImpl>,
792) -> bool {
793 cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
794}
795
796#[cfg(test)]
797mod tests {
798
799 use std::iter;
800 use std::sync::atomic::AtomicU64;
801
802 use rand::rngs::SmallRng;
803 use rand::{Rng, RngCore, SeedableRng};
804 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
805 use risingwave_common::catalog::Field;
806 use risingwave_common::util::epoch::test_epoch;
807 use risingwave_common::util::sort_util::OrderType;
808 use risingwave_hummock_sdk::HummockReadEpoch;
809 use risingwave_storage::memory::MemoryStateStore;
810 use risingwave_storage::table::batch_table::BatchTable;
811
812 use super::*;
813 use crate::executor::test_utils::*;
814
815 #[tokio::test]
816 async fn test_materialize_executor() {
817 let memory_state_store = MemoryStateStore::new();
819 let table_id = TableId::new(1);
820 let schema = Schema::new(vec![
822 Field::unnamed(DataType::Int32),
823 Field::unnamed(DataType::Int32),
824 ]);
825 let column_ids = vec![0.into(), 1.into()];
826
827 let chunk1 = StreamChunk::from_pretty(
829 " i i
830 + 1 4
831 + 2 5
832 + 3 6",
833 );
834 let chunk2 = StreamChunk::from_pretty(
835 " i i
836 + 7 8
837 - 3 6",
838 );
839
840 let source = MockSource::with_messages(vec![
842 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
843 Message::Chunk(chunk1),
844 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
845 Message::Chunk(chunk2),
846 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
847 ])
848 .into_executor(schema.clone(), PkIndices::new());
849
850 let order_types = vec![OrderType::ascending()];
851 let column_descs = vec![
852 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
853 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
854 ];
855
856 let table = BatchTable::for_test(
857 memory_state_store.clone(),
858 table_id,
859 column_descs,
860 order_types,
861 vec![0],
862 vec![0, 1],
863 );
864
865 let mut materialize_executor = MaterializeExecutor::for_test(
866 source,
867 memory_state_store,
868 table_id,
869 vec![ColumnOrder::new(0, OrderType::ascending())],
870 column_ids,
871 Arc::new(AtomicU64::new(0)),
872 ConflictBehavior::NoCheck,
873 )
874 .await
875 .boxed()
876 .execute();
877 materialize_executor.next().await.transpose().unwrap();
878
879 materialize_executor.next().await.transpose().unwrap();
880
881 match materialize_executor.next().await.transpose().unwrap() {
883 Some(Message::Barrier(_)) => {
884 let row = table
885 .get_row(
886 &OwnedRow::new(vec![Some(3_i32.into())]),
887 HummockReadEpoch::NoWait(u64::MAX),
888 )
889 .await
890 .unwrap();
891 assert_eq!(
892 row,
893 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
894 );
895 }
896 _ => unreachable!(),
897 }
898 materialize_executor.next().await.transpose().unwrap();
899 match materialize_executor.next().await.transpose().unwrap() {
901 Some(Message::Barrier(_)) => {
902 let row = table
903 .get_row(
904 &OwnedRow::new(vec![Some(7_i32.into())]),
905 HummockReadEpoch::NoWait(u64::MAX),
906 )
907 .await
908 .unwrap();
909 assert_eq!(
910 row,
911 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
912 );
913 }
914 _ => unreachable!(),
915 }
916 }
917
918 #[tokio::test]
920 async fn test_upsert_stream() {
921 let memory_state_store = MemoryStateStore::new();
923 let table_id = TableId::new(1);
924 let schema = Schema::new(vec![
926 Field::unnamed(DataType::Int32),
927 Field::unnamed(DataType::Int32),
928 ]);
929 let column_ids = vec![0.into(), 1.into()];
930
931 let chunk1 = StreamChunk::from_pretty(
933 " i i
934 + 1 1",
935 );
936
937 let chunk2 = StreamChunk::from_pretty(
938 " i i
939 + 1 2
940 - 1 2",
941 );
942
943 let source = MockSource::with_messages(vec![
945 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
946 Message::Chunk(chunk1),
947 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
948 Message::Chunk(chunk2),
949 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
950 ])
951 .into_executor(schema.clone(), PkIndices::new());
952
953 let order_types = vec![OrderType::ascending()];
954 let column_descs = vec![
955 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
956 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
957 ];
958
959 let table = BatchTable::for_test(
960 memory_state_store.clone(),
961 table_id,
962 column_descs,
963 order_types,
964 vec![0],
965 vec![0, 1],
966 );
967
968 let mut materialize_executor = MaterializeExecutor::for_test(
969 source,
970 memory_state_store,
971 table_id,
972 vec![ColumnOrder::new(0, OrderType::ascending())],
973 column_ids,
974 Arc::new(AtomicU64::new(0)),
975 ConflictBehavior::Overwrite,
976 )
977 .await
978 .boxed()
979 .execute();
980 materialize_executor.next().await.transpose().unwrap();
981
982 materialize_executor.next().await.transpose().unwrap();
983 materialize_executor.next().await.transpose().unwrap();
984 materialize_executor.next().await.transpose().unwrap();
985
986 match materialize_executor.next().await.transpose().unwrap() {
987 Some(Message::Barrier(_)) => {
988 let row = table
989 .get_row(
990 &OwnedRow::new(vec![Some(1_i32.into())]),
991 HummockReadEpoch::NoWait(u64::MAX),
992 )
993 .await
994 .unwrap();
995 assert!(row.is_none());
996 }
997 _ => unreachable!(),
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_check_insert_conflict() {
1003 let memory_state_store = MemoryStateStore::new();
1005 let table_id = TableId::new(1);
1006 let schema = Schema::new(vec![
1008 Field::unnamed(DataType::Int32),
1009 Field::unnamed(DataType::Int32),
1010 ]);
1011 let column_ids = vec![0.into(), 1.into()];
1012
1013 let chunk1 = StreamChunk::from_pretty(
1015 " i i
1016 + 1 3
1017 + 1 4
1018 + 2 5
1019 + 3 6",
1020 );
1021
1022 let chunk2 = StreamChunk::from_pretty(
1023 " i i
1024 + 1 3
1025 + 2 6",
1026 );
1027
1028 let chunk3 = StreamChunk::from_pretty(
1030 " i i
1031 + 1 4",
1032 );
1033
1034 let source = MockSource::with_messages(vec![
1036 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1037 Message::Chunk(chunk1),
1038 Message::Chunk(chunk2),
1039 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1040 Message::Chunk(chunk3),
1041 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1042 ])
1043 .into_executor(schema.clone(), PkIndices::new());
1044
1045 let order_types = vec![OrderType::ascending()];
1046 let column_descs = vec![
1047 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1048 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1049 ];
1050
1051 let table = BatchTable::for_test(
1052 memory_state_store.clone(),
1053 table_id,
1054 column_descs,
1055 order_types,
1056 vec![0],
1057 vec![0, 1],
1058 );
1059
1060 let mut materialize_executor = MaterializeExecutor::for_test(
1061 source,
1062 memory_state_store,
1063 table_id,
1064 vec![ColumnOrder::new(0, OrderType::ascending())],
1065 column_ids,
1066 Arc::new(AtomicU64::new(0)),
1067 ConflictBehavior::Overwrite,
1068 )
1069 .await
1070 .boxed()
1071 .execute();
1072 materialize_executor.next().await.transpose().unwrap();
1073
1074 materialize_executor.next().await.transpose().unwrap();
1075 materialize_executor.next().await.transpose().unwrap();
1076
1077 match materialize_executor.next().await.transpose().unwrap() {
1079 Some(Message::Barrier(_)) => {
1080 let row = table
1081 .get_row(
1082 &OwnedRow::new(vec![Some(3_i32.into())]),
1083 HummockReadEpoch::NoWait(u64::MAX),
1084 )
1085 .await
1086 .unwrap();
1087 assert_eq!(
1088 row,
1089 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1090 );
1091
1092 let row = table
1093 .get_row(
1094 &OwnedRow::new(vec![Some(1_i32.into())]),
1095 HummockReadEpoch::NoWait(u64::MAX),
1096 )
1097 .await
1098 .unwrap();
1099 assert_eq!(
1100 row,
1101 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1102 );
1103
1104 let row = table
1105 .get_row(
1106 &OwnedRow::new(vec![Some(2_i32.into())]),
1107 HummockReadEpoch::NoWait(u64::MAX),
1108 )
1109 .await
1110 .unwrap();
1111 assert_eq!(
1112 row,
1113 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1114 );
1115 }
1116 _ => unreachable!(),
1117 }
1118 }
1119
1120 #[tokio::test]
1121 async fn test_delete_and_update_conflict() {
1122 let memory_state_store = MemoryStateStore::new();
1124 let table_id = TableId::new(1);
1125 let schema = Schema::new(vec![
1127 Field::unnamed(DataType::Int32),
1128 Field::unnamed(DataType::Int32),
1129 ]);
1130 let column_ids = vec![0.into(), 1.into()];
1131
1132 let chunk1 = StreamChunk::from_pretty(
1134 " i i
1135 + 1 4
1136 + 2 5
1137 + 3 6
1138 U- 8 1
1139 U+ 8 2
1140 + 8 3",
1141 );
1142
1143 let chunk2 = StreamChunk::from_pretty(
1145 " i i
1146 + 7 8
1147 - 3 4
1148 - 5 0",
1149 );
1150
1151 let chunk3 = StreamChunk::from_pretty(
1153 " i i
1154 + 1 5
1155 U- 2 4
1156 U+ 2 8
1157 U- 9 0
1158 U+ 9 1",
1159 );
1160
1161 let source = MockSource::with_messages(vec![
1163 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1164 Message::Chunk(chunk1),
1165 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1166 Message::Chunk(chunk2),
1167 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1168 Message::Chunk(chunk3),
1169 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1170 ])
1171 .into_executor(schema.clone(), PkIndices::new());
1172
1173 let order_types = vec![OrderType::ascending()];
1174 let column_descs = vec![
1175 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1176 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1177 ];
1178
1179 let table = BatchTable::for_test(
1180 memory_state_store.clone(),
1181 table_id,
1182 column_descs,
1183 order_types,
1184 vec![0],
1185 vec![0, 1],
1186 );
1187
1188 let mut materialize_executor = MaterializeExecutor::for_test(
1189 source,
1190 memory_state_store,
1191 table_id,
1192 vec![ColumnOrder::new(0, OrderType::ascending())],
1193 column_ids,
1194 Arc::new(AtomicU64::new(0)),
1195 ConflictBehavior::Overwrite,
1196 )
1197 .await
1198 .boxed()
1199 .execute();
1200 materialize_executor.next().await.transpose().unwrap();
1201
1202 materialize_executor.next().await.transpose().unwrap();
1203
1204 match materialize_executor.next().await.transpose().unwrap() {
1206 Some(Message::Barrier(_)) => {
1207 let row = table
1209 .get_row(
1210 &OwnedRow::new(vec![Some(8_i32.into())]),
1211 HummockReadEpoch::NoWait(u64::MAX),
1212 )
1213 .await
1214 .unwrap();
1215 assert_eq!(
1216 row,
1217 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1218 );
1219 }
1220 _ => unreachable!(),
1221 }
1222 materialize_executor.next().await.transpose().unwrap();
1223
1224 match materialize_executor.next().await.transpose().unwrap() {
1225 Some(Message::Barrier(_)) => {
1226 let row = table
1227 .get_row(
1228 &OwnedRow::new(vec![Some(7_i32.into())]),
1229 HummockReadEpoch::NoWait(u64::MAX),
1230 )
1231 .await
1232 .unwrap();
1233 assert_eq!(
1234 row,
1235 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1236 );
1237
1238 let row = table
1240 .get_row(
1241 &OwnedRow::new(vec![Some(3_i32.into())]),
1242 HummockReadEpoch::NoWait(u64::MAX),
1243 )
1244 .await
1245 .unwrap();
1246 assert_eq!(row, None);
1247
1248 let row = table
1250 .get_row(
1251 &OwnedRow::new(vec![Some(5_i32.into())]),
1252 HummockReadEpoch::NoWait(u64::MAX),
1253 )
1254 .await
1255 .unwrap();
1256 assert_eq!(row, None);
1257 }
1258 _ => unreachable!(),
1259 }
1260
1261 materialize_executor.next().await.transpose().unwrap();
1262 match materialize_executor.next().await.transpose().unwrap() {
1264 Some(Message::Barrier(_)) => {
1265 let row = table
1266 .get_row(
1267 &OwnedRow::new(vec![Some(1_i32.into())]),
1268 HummockReadEpoch::NoWait(u64::MAX),
1269 )
1270 .await
1271 .unwrap();
1272 assert_eq!(
1273 row,
1274 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1275 );
1276
1277 let row = table
1279 .get_row(
1280 &OwnedRow::new(vec![Some(2_i32.into())]),
1281 HummockReadEpoch::NoWait(u64::MAX),
1282 )
1283 .await
1284 .unwrap();
1285 assert_eq!(
1286 row,
1287 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1288 );
1289
1290 let row = table
1292 .get_row(
1293 &OwnedRow::new(vec![Some(9_i32.into())]),
1294 HummockReadEpoch::NoWait(u64::MAX),
1295 )
1296 .await
1297 .unwrap();
1298 assert_eq!(
1299 row,
1300 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1301 );
1302 }
1303 _ => unreachable!(),
1304 }
1305 }
1306
1307 #[tokio::test]
1308 async fn test_ignore_insert_conflict() {
1309 let memory_state_store = MemoryStateStore::new();
1311 let table_id = TableId::new(1);
1312 let schema = Schema::new(vec![
1314 Field::unnamed(DataType::Int32),
1315 Field::unnamed(DataType::Int32),
1316 ]);
1317 let column_ids = vec![0.into(), 1.into()];
1318
1319 let chunk1 = StreamChunk::from_pretty(
1321 " i i
1322 + 1 3
1323 + 1 4
1324 + 2 5
1325 + 3 6",
1326 );
1327
1328 let chunk2 = StreamChunk::from_pretty(
1329 " i i
1330 + 1 5
1331 + 2 6",
1332 );
1333
1334 let chunk3 = StreamChunk::from_pretty(
1336 " i i
1337 + 1 6",
1338 );
1339
1340 let source = MockSource::with_messages(vec![
1342 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1343 Message::Chunk(chunk1),
1344 Message::Chunk(chunk2),
1345 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1346 Message::Chunk(chunk3),
1347 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1348 ])
1349 .into_executor(schema.clone(), PkIndices::new());
1350
1351 let order_types = vec![OrderType::ascending()];
1352 let column_descs = vec![
1353 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1354 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1355 ];
1356
1357 let table = BatchTable::for_test(
1358 memory_state_store.clone(),
1359 table_id,
1360 column_descs,
1361 order_types,
1362 vec![0],
1363 vec![0, 1],
1364 );
1365
1366 let mut materialize_executor = MaterializeExecutor::for_test(
1367 source,
1368 memory_state_store,
1369 table_id,
1370 vec![ColumnOrder::new(0, OrderType::ascending())],
1371 column_ids,
1372 Arc::new(AtomicU64::new(0)),
1373 ConflictBehavior::IgnoreConflict,
1374 )
1375 .await
1376 .boxed()
1377 .execute();
1378 materialize_executor.next().await.transpose().unwrap();
1379
1380 materialize_executor.next().await.transpose().unwrap();
1381 materialize_executor.next().await.transpose().unwrap();
1382
1383 match materialize_executor.next().await.transpose().unwrap() {
1385 Some(Message::Barrier(_)) => {
1386 let row = table
1387 .get_row(
1388 &OwnedRow::new(vec![Some(3_i32.into())]),
1389 HummockReadEpoch::NoWait(u64::MAX),
1390 )
1391 .await
1392 .unwrap();
1393 assert_eq!(
1394 row,
1395 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1396 );
1397
1398 let row = table
1399 .get_row(
1400 &OwnedRow::new(vec![Some(1_i32.into())]),
1401 HummockReadEpoch::NoWait(u64::MAX),
1402 )
1403 .await
1404 .unwrap();
1405 assert_eq!(
1406 row,
1407 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1408 );
1409
1410 let row = table
1411 .get_row(
1412 &OwnedRow::new(vec![Some(2_i32.into())]),
1413 HummockReadEpoch::NoWait(u64::MAX),
1414 )
1415 .await
1416 .unwrap();
1417 assert_eq!(
1418 row,
1419 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1420 );
1421 }
1422 _ => unreachable!(),
1423 }
1424 }
1425
1426 #[tokio::test]
1427 async fn test_ignore_delete_then_insert() {
1428 let memory_state_store = MemoryStateStore::new();
1430 let table_id = TableId::new(1);
1431 let schema = Schema::new(vec![
1433 Field::unnamed(DataType::Int32),
1434 Field::unnamed(DataType::Int32),
1435 ]);
1436 let column_ids = vec![0.into(), 1.into()];
1437
1438 let chunk1 = StreamChunk::from_pretty(
1440 " i i
1441 + 1 3
1442 - 1 3
1443 + 1 6",
1444 );
1445
1446 let source = MockSource::with_messages(vec![
1448 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1449 Message::Chunk(chunk1),
1450 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1451 ])
1452 .into_executor(schema.clone(), PkIndices::new());
1453
1454 let order_types = vec![OrderType::ascending()];
1455 let column_descs = vec![
1456 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1457 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1458 ];
1459
1460 let table = BatchTable::for_test(
1461 memory_state_store.clone(),
1462 table_id,
1463 column_descs,
1464 order_types,
1465 vec![0],
1466 vec![0, 1],
1467 );
1468
1469 let mut materialize_executor = MaterializeExecutor::for_test(
1470 source,
1471 memory_state_store,
1472 table_id,
1473 vec![ColumnOrder::new(0, OrderType::ascending())],
1474 column_ids,
1475 Arc::new(AtomicU64::new(0)),
1476 ConflictBehavior::IgnoreConflict,
1477 )
1478 .await
1479 .boxed()
1480 .execute();
1481 let _msg1 = materialize_executor
1482 .next()
1483 .await
1484 .transpose()
1485 .unwrap()
1486 .unwrap()
1487 .as_barrier()
1488 .unwrap();
1489 let _msg2 = materialize_executor
1490 .next()
1491 .await
1492 .transpose()
1493 .unwrap()
1494 .unwrap()
1495 .as_chunk()
1496 .unwrap();
1497 let _msg3 = materialize_executor
1498 .next()
1499 .await
1500 .transpose()
1501 .unwrap()
1502 .unwrap()
1503 .as_barrier()
1504 .unwrap();
1505
1506 let row = table
1507 .get_row(
1508 &OwnedRow::new(vec![Some(1_i32.into())]),
1509 HummockReadEpoch::NoWait(u64::MAX),
1510 )
1511 .await
1512 .unwrap();
1513 assert_eq!(
1514 row,
1515 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1516 );
1517 }
1518
1519 #[tokio::test]
1520 async fn test_ignore_delete_and_update_conflict() {
1521 let memory_state_store = MemoryStateStore::new();
1523 let table_id = TableId::new(1);
1524 let schema = Schema::new(vec![
1526 Field::unnamed(DataType::Int32),
1527 Field::unnamed(DataType::Int32),
1528 ]);
1529 let column_ids = vec![0.into(), 1.into()];
1530
1531 let chunk1 = StreamChunk::from_pretty(
1533 " i i
1534 + 1 4
1535 + 2 5
1536 + 3 6
1537 U- 8 1
1538 U+ 8 2
1539 + 8 3",
1540 );
1541
1542 let chunk2 = StreamChunk::from_pretty(
1544 " i i
1545 + 7 8
1546 - 3 4
1547 - 5 0",
1548 );
1549
1550 let chunk3 = StreamChunk::from_pretty(
1552 " i i
1553 + 1 5
1554 U- 2 4
1555 U+ 2 8
1556 U- 9 0
1557 U+ 9 1",
1558 );
1559
1560 let source = MockSource::with_messages(vec![
1562 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1563 Message::Chunk(chunk1),
1564 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1565 Message::Chunk(chunk2),
1566 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1567 Message::Chunk(chunk3),
1568 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1569 ])
1570 .into_executor(schema.clone(), PkIndices::new());
1571
1572 let order_types = vec![OrderType::ascending()];
1573 let column_descs = vec![
1574 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1575 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1576 ];
1577
1578 let table = BatchTable::for_test(
1579 memory_state_store.clone(),
1580 table_id,
1581 column_descs,
1582 order_types,
1583 vec![0],
1584 vec![0, 1],
1585 );
1586
1587 let mut materialize_executor = MaterializeExecutor::for_test(
1588 source,
1589 memory_state_store,
1590 table_id,
1591 vec![ColumnOrder::new(0, OrderType::ascending())],
1592 column_ids,
1593 Arc::new(AtomicU64::new(0)),
1594 ConflictBehavior::IgnoreConflict,
1595 )
1596 .await
1597 .boxed()
1598 .execute();
1599 materialize_executor.next().await.transpose().unwrap();
1600
1601 materialize_executor.next().await.transpose().unwrap();
1602
1603 match materialize_executor.next().await.transpose().unwrap() {
1605 Some(Message::Barrier(_)) => {
1606 let row = table
1608 .get_row(
1609 &OwnedRow::new(vec![Some(8_i32.into())]),
1610 HummockReadEpoch::NoWait(u64::MAX),
1611 )
1612 .await
1613 .unwrap();
1614 assert_eq!(
1615 row,
1616 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1617 );
1618 }
1619 _ => unreachable!(),
1620 }
1621 materialize_executor.next().await.transpose().unwrap();
1622
1623 match materialize_executor.next().await.transpose().unwrap() {
1624 Some(Message::Barrier(_)) => {
1625 let row = table
1626 .get_row(
1627 &OwnedRow::new(vec![Some(7_i32.into())]),
1628 HummockReadEpoch::NoWait(u64::MAX),
1629 )
1630 .await
1631 .unwrap();
1632 assert_eq!(
1633 row,
1634 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1635 );
1636
1637 let row = table
1639 .get_row(
1640 &OwnedRow::new(vec![Some(3_i32.into())]),
1641 HummockReadEpoch::NoWait(u64::MAX),
1642 )
1643 .await
1644 .unwrap();
1645 assert_eq!(row, None);
1646
1647 let row = table
1649 .get_row(
1650 &OwnedRow::new(vec![Some(5_i32.into())]),
1651 HummockReadEpoch::NoWait(u64::MAX),
1652 )
1653 .await
1654 .unwrap();
1655 assert_eq!(row, None);
1656 }
1657 _ => unreachable!(),
1658 }
1659
1660 materialize_executor.next().await.transpose().unwrap();
1661 match materialize_executor.next().await.transpose().unwrap() {
1664 Some(Message::Barrier(_)) => {
1665 let row = table
1666 .get_row(
1667 &OwnedRow::new(vec![Some(1_i32.into())]),
1668 HummockReadEpoch::NoWait(u64::MAX),
1669 )
1670 .await
1671 .unwrap();
1672 assert_eq!(
1673 row,
1674 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1675 );
1676
1677 let row = table
1679 .get_row(
1680 &OwnedRow::new(vec![Some(2_i32.into())]),
1681 HummockReadEpoch::NoWait(u64::MAX),
1682 )
1683 .await
1684 .unwrap();
1685 assert_eq!(
1686 row,
1687 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1688 );
1689
1690 let row = table
1692 .get_row(
1693 &OwnedRow::new(vec![Some(9_i32.into())]),
1694 HummockReadEpoch::NoWait(u64::MAX),
1695 )
1696 .await
1697 .unwrap();
1698 assert_eq!(
1699 row,
1700 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1701 );
1702 }
1703 _ => unreachable!(),
1704 }
1705 }
1706
1707 #[tokio::test]
1708 async fn test_do_update_if_not_null_conflict() {
1709 let memory_state_store = MemoryStateStore::new();
1711 let table_id = TableId::new(1);
1712 let schema = Schema::new(vec![
1714 Field::unnamed(DataType::Int32),
1715 Field::unnamed(DataType::Int32),
1716 ]);
1717 let column_ids = vec![0.into(), 1.into()];
1718
1719 let chunk1 = StreamChunk::from_pretty(
1721 " i i
1722 + 1 4
1723 + 2 .
1724 + 3 6
1725 U- 8 .
1726 U+ 8 2
1727 + 8 .",
1728 );
1729
1730 let chunk2 = StreamChunk::from_pretty(
1732 " i i
1733 + 7 8
1734 - 3 4
1735 - 5 0",
1736 );
1737
1738 let chunk3 = StreamChunk::from_pretty(
1740 " i i
1741 + 1 5
1742 + 7 .
1743 U- 2 4
1744 U+ 2 .
1745 U- 9 0
1746 U+ 9 1",
1747 );
1748
1749 let source = MockSource::with_messages(vec![
1751 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1752 Message::Chunk(chunk1),
1753 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1754 Message::Chunk(chunk2),
1755 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1756 Message::Chunk(chunk3),
1757 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1758 ])
1759 .into_executor(schema.clone(), PkIndices::new());
1760
1761 let order_types = vec![OrderType::ascending()];
1762 let column_descs = vec![
1763 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1764 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1765 ];
1766
1767 let table = BatchTable::for_test(
1768 memory_state_store.clone(),
1769 table_id,
1770 column_descs,
1771 order_types,
1772 vec![0],
1773 vec![0, 1],
1774 );
1775
1776 let mut materialize_executor = MaterializeExecutor::for_test(
1777 source,
1778 memory_state_store,
1779 table_id,
1780 vec![ColumnOrder::new(0, OrderType::ascending())],
1781 column_ids,
1782 Arc::new(AtomicU64::new(0)),
1783 ConflictBehavior::DoUpdateIfNotNull,
1784 )
1785 .await
1786 .boxed()
1787 .execute();
1788 materialize_executor.next().await.transpose().unwrap();
1789
1790 materialize_executor.next().await.transpose().unwrap();
1791
1792 match materialize_executor.next().await.transpose().unwrap() {
1794 Some(Message::Barrier(_)) => {
1795 let row = table
1796 .get_row(
1797 &OwnedRow::new(vec![Some(8_i32.into())]),
1798 HummockReadEpoch::NoWait(u64::MAX),
1799 )
1800 .await
1801 .unwrap();
1802 assert_eq!(
1803 row,
1804 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1805 );
1806
1807 let row = table
1808 .get_row(
1809 &OwnedRow::new(vec![Some(2_i32.into())]),
1810 HummockReadEpoch::NoWait(u64::MAX),
1811 )
1812 .await
1813 .unwrap();
1814 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1815 }
1816 _ => unreachable!(),
1817 }
1818 materialize_executor.next().await.transpose().unwrap();
1819
1820 match materialize_executor.next().await.transpose().unwrap() {
1821 Some(Message::Barrier(_)) => {
1822 let row = table
1823 .get_row(
1824 &OwnedRow::new(vec![Some(7_i32.into())]),
1825 HummockReadEpoch::NoWait(u64::MAX),
1826 )
1827 .await
1828 .unwrap();
1829 assert_eq!(
1830 row,
1831 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1832 );
1833
1834 let row = table
1836 .get_row(
1837 &OwnedRow::new(vec![Some(3_i32.into())]),
1838 HummockReadEpoch::NoWait(u64::MAX),
1839 )
1840 .await
1841 .unwrap();
1842 assert_eq!(row, None);
1843
1844 let row = table
1846 .get_row(
1847 &OwnedRow::new(vec![Some(5_i32.into())]),
1848 HummockReadEpoch::NoWait(u64::MAX),
1849 )
1850 .await
1851 .unwrap();
1852 assert_eq!(row, None);
1853 }
1854 _ => unreachable!(),
1855 }
1856
1857 materialize_executor.next().await.transpose().unwrap();
1858 match materialize_executor.next().await.transpose().unwrap() {
1861 Some(Message::Barrier(_)) => {
1862 let row = table
1863 .get_row(
1864 &OwnedRow::new(vec![Some(7_i32.into())]),
1865 HummockReadEpoch::NoWait(u64::MAX),
1866 )
1867 .await
1868 .unwrap();
1869 assert_eq!(
1870 row,
1871 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1872 );
1873
1874 let row = table
1876 .get_row(
1877 &OwnedRow::new(vec![Some(2_i32.into())]),
1878 HummockReadEpoch::NoWait(u64::MAX),
1879 )
1880 .await
1881 .unwrap();
1882 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1883
1884 let row = table
1886 .get_row(
1887 &OwnedRow::new(vec![Some(9_i32.into())]),
1888 HummockReadEpoch::NoWait(u64::MAX),
1889 )
1890 .await
1891 .unwrap();
1892 assert_eq!(
1893 row,
1894 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1895 );
1896 }
1897 _ => unreachable!(),
1898 }
1899 }
1900
1901 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1902 const KN: u32 = 4;
1903 const SEED: u64 = 998244353;
1904 let mut ret = vec![];
1905 let mut builder =
1906 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1907 let mut rng = SmallRng::seed_from_u64(SEED);
1908
1909 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1910 let len = c.data_chunk().capacity();
1911 let mut c = StreamChunkMut::from(c);
1912 for i in 0..len {
1913 c.set_vis(i, rng.random_bool(0.5));
1914 }
1915 c.into()
1916 };
1917 for _ in 0..row_number {
1918 let k = (rng.next_u32() % KN) as i32;
1919 let v = rng.next_u32() as i32;
1920 let op = if rng.random_bool(0.5) {
1921 Op::Insert
1922 } else {
1923 Op::Delete
1924 };
1925 if let Some(c) =
1926 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1927 {
1928 ret.push(random_vis(c, &mut rng));
1929 }
1930 }
1931 if let Some(c) = builder.take() {
1932 ret.push(random_vis(c, &mut rng));
1933 }
1934 ret
1935 }
1936
1937 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1938 const N: usize = 100000;
1939
1940 let memory_state_store = MemoryStateStore::new();
1942 let table_id = TableId::new(1);
1943 let schema = Schema::new(vec![
1945 Field::unnamed(DataType::Int32),
1946 Field::unnamed(DataType::Int32),
1947 ]);
1948 let column_ids = vec![0.into(), 1.into()];
1949
1950 let chunks = gen_fuzz_data(N, 128);
1951 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1952 .chain(chunks.into_iter().map(Message::Chunk))
1953 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1954 test_epoch(2),
1955 ))))
1956 .collect();
1957 let source =
1959 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1960
1961 let mut materialize_executor = MaterializeExecutor::for_test(
1962 source,
1963 memory_state_store.clone(),
1964 table_id,
1965 vec![ColumnOrder::new(0, OrderType::ascending())],
1966 column_ids,
1967 Arc::new(AtomicU64::new(0)),
1968 conflict_behavior,
1969 )
1970 .await
1971 .boxed()
1972 .execute();
1973 materialize_executor.expect_barrier().await;
1974
1975 let order_types = vec![OrderType::ascending()];
1976 let column_descs = vec![
1977 ColumnDesc::unnamed(0.into(), DataType::Int32),
1978 ColumnDesc::unnamed(1.into(), DataType::Int32),
1979 ];
1980 let pk_indices = vec![0];
1981
1982 let mut table = StateTable::from_table_catalog(
1983 &gen_pbtable(
1984 TableId::from(1002),
1985 column_descs.clone(),
1986 order_types,
1987 pk_indices,
1988 0,
1989 ),
1990 memory_state_store.clone(),
1991 None,
1992 )
1993 .await;
1994
1995 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
1996 table.write_chunk(c);
1998 }
1999 }
2000
2001 #[tokio::test]
2002 async fn fuzz_test_stream_consistent_upsert() {
2003 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2004 }
2005
2006 #[tokio::test]
2007 async fn fuzz_test_stream_consistent_ignore() {
2008 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2009 }
2010}