1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27 ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48 input: Executor,
49
50 schema: Schema,
51
52 state_table: StateTableInner<S, SD>,
53
54 arrange_key_indices: Vec<usize>,
56
57 actor_context: ActorContextRef,
58
59 materialize_cache: MaterializeCache<SD>,
60
61 conflict_behavior: ConflictBehavior,
62
63 version_column_index: Option<u32>,
64
65 may_have_downstream: bool,
66
67 depended_subscription_ids: HashSet<u32>,
68
69 metrics: MaterializeMetrics,
70
71 is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77 conflict_behavior: ConflictBehavior,
78 may_have_downstream: bool,
79 depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81 if !depended_subscriptions.is_empty() {
82 StateTableOpConsistencyLevel::LogStoreEnabled
83 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84 StateTableOpConsistencyLevel::Inconsistent
87 } else {
88 StateTableOpConsistencyLevel::ConsistentOldValue
89 }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93 #[allow(clippy::too_many_arguments)]
97 pub async fn new(
98 input: Executor,
99 schema: Schema,
100 store: S,
101 arrange_key: Vec<ColumnOrder>,
102 actor_context: ActorContextRef,
103 vnodes: Option<Arc<Bitmap>>,
104 table_catalog: &Table,
105 watermark_epoch: AtomicU64Ref,
106 conflict_behavior: ConflictBehavior,
107 version_column_index: Option<u32>,
108 metrics: Arc<StreamingMetrics>,
109 ) -> Self {
110 let table_columns: Vec<ColumnDesc> = table_catalog
111 .columns
112 .iter()
113 .map(|col| col.column_desc.as_ref().unwrap().into())
114 .collect();
115
116 let row_serde: BasicSerde = BasicSerde::new(
117 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118 Arc::from(table_columns.into_boxed_slice()),
119 );
120
121 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122 let may_have_downstream = actor_context.initial_dispatch_num != 0;
123 let depended_subscription_ids = actor_context
124 .related_subscriptions
125 .get(&TableId::new(table_catalog.id))
126 .cloned()
127 .unwrap_or_default();
128 let op_consistency_level = get_op_consistency_level(
129 conflict_behavior,
130 may_have_downstream,
131 &depended_subscription_ids,
132 );
133 let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135 table_catalog,
136 store,
137 vnodes,
138 op_consistency_level,
139 )
140 .await;
141
142 let mv_metrics = metrics.new_materialize_metrics(
143 TableId::new(table_catalog.id),
144 actor_context.id,
145 actor_context.fragment_id,
146 );
147
148 let metrics_info =
149 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151 let is_dummy_table =
152 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154 Self {
155 input,
156 schema,
157 state_table,
158 arrange_key_indices,
159 actor_context,
160 materialize_cache: MaterializeCache::new(
161 watermark_epoch,
162 metrics_info,
163 row_serde,
164 version_column_index,
165 ),
166 conflict_behavior,
167 version_column_index,
168 is_dummy_table,
169 may_have_downstream,
170 depended_subscription_ids,
171 metrics: mv_metrics,
172 }
173 }
174
175 #[try_stream(ok = Message, error = StreamExecutorError)]
176 async fn execute_inner(mut self) {
177 let mv_table_id = TableId::new(self.state_table.table_id());
178
179 let data_types = self.schema.data_types();
180 let mut input = self.input.execute();
181
182 let barrier = expect_first_barrier(&mut input).await?;
183 let first_epoch = barrier.epoch;
184 yield Message::Barrier(barrier);
186 self.state_table.init_epoch(first_epoch).await?;
187
188 #[for_await]
189 for msg in input {
190 let msg = msg?;
191 self.materialize_cache.evict();
192
193 let msg = match msg {
194 Message::Watermark(w) => Message::Watermark(w),
195 Message::Chunk(chunk) if self.is_dummy_table => {
196 self.metrics
197 .materialize_input_row_count
198 .inc_by(chunk.cardinality() as u64);
199 Message::Chunk(chunk)
200 }
201 Message::Chunk(chunk) => {
202 self.metrics
203 .materialize_input_row_count
204 .inc_by(chunk.cardinality() as u64);
205
206 let do_not_handle_conflict = !self.state_table.is_consistent_op()
210 && self.version_column_index.is_none()
211 && self.conflict_behavior == ConflictBehavior::Overwrite;
212 match self.conflict_behavior {
213 checked_conflict_behaviors!() if !do_not_handle_conflict => {
214 if chunk.cardinality() == 0 {
215 continue;
217 }
218 let (data_chunk, ops) = chunk.into_parts();
219
220 if self.state_table.value_indices().is_some() {
221 panic!(
224 "materialize executor with data check can not handle only materialize partial columns"
225 )
226 };
227 let values = data_chunk.serialize();
228
229 let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231 let pks = {
232 let mut pks = vec![vec![]; data_chunk.capacity()];
233 key_chunk
234 .rows_with_holes()
235 .zip_eq_fast(pks.iter_mut())
236 .for_each(|(r, vnode_and_pk)| {
237 if let Some(r) = r {
238 self.state_table.pk_serde().serialize(r, vnode_and_pk);
239 }
240 });
241 pks
242 };
243 let (_, vis) = key_chunk.into_parts();
244 let row_ops = ops
245 .iter()
246 .zip_eq_debug(pks.into_iter())
247 .zip_eq_debug(values.into_iter())
248 .zip_eq_debug(vis.iter())
249 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250 .collect_vec();
251
252 let change_buffer = self
253 .materialize_cache
254 .handle(
255 row_ops,
256 &self.state_table,
257 self.conflict_behavior,
258 &self.metrics,
259 )
260 .await?;
261
262 match generate_output(change_buffer, data_types.clone())? {
263 Some(output_chunk) => {
264 self.state_table.write_chunk(output_chunk.clone());
265 self.state_table.try_flush().await?;
266 Message::Chunk(output_chunk)
267 }
268 None => continue,
269 }
270 }
271 ConflictBehavior::IgnoreConflict => unreachable!(),
272 ConflictBehavior::NoCheck
273 | ConflictBehavior::Overwrite
274 | ConflictBehavior::DoUpdateIfNotNull => {
275 self.state_table.write_chunk(chunk.clone());
276 self.state_table.try_flush().await?;
277 Message::Chunk(chunk)
278 } }
280 }
281 Message::Barrier(b) => {
282 if !self.may_have_downstream
284 && b.has_more_downstream_fragments(self.actor_context.id)
285 {
286 self.may_have_downstream = true;
287 }
288 Self::may_update_depended_subscriptions(
289 &mut self.depended_subscription_ids,
290 &b,
291 mv_table_id,
292 );
293 let op_consistency_level = get_op_consistency_level(
294 self.conflict_behavior,
295 self.may_have_downstream,
296 &self.depended_subscription_ids,
297 );
298 let post_commit = self
299 .state_table
300 .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301 .await?;
302 if !post_commit.inner().is_consistent_op() {
303 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304 }
305
306 let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307 let b_epoch = b.epoch;
308 yield Message::Barrier(b);
309
310 if let Some((_, cache_may_stale)) =
312 post_commit.post_yield_barrier(update_vnode_bitmap).await?
313 {
314 if cache_may_stale {
315 self.materialize_cache.lru_cache.clear();
316 }
317 }
318
319 self.metrics
320 .materialize_current_epoch
321 .set(b_epoch.curr as i64);
322
323 continue;
324 }
325 };
326 yield msg;
327 }
328 }
329
330 fn may_update_depended_subscriptions(
332 depended_subscriptions: &mut HashSet<u32>,
333 barrier: &Barrier,
334 mv_table_id: TableId,
335 ) {
336 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
337 if !depended_subscriptions.insert(subscriber_id) {
338 warn!(
339 ?depended_subscriptions,
340 ?mv_table_id,
341 subscriber_id,
342 "subscription id already exists"
343 );
344 }
345 }
346
347 if let Some(Mutation::DropSubscriptions {
348 subscriptions_to_drop,
349 }) = barrier.mutation.as_deref()
350 {
351 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
352 if *upstream_mv_table_id == mv_table_id
353 && !depended_subscriptions.remove(subscriber_id)
354 {
355 warn!(
356 ?depended_subscriptions,
357 ?mv_table_id,
358 subscriber_id,
359 "drop non existing subscriber_id id"
360 );
361 }
362 }
363 }
364 }
365}
366
367impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
368 #[allow(clippy::too_many_arguments)]
370 pub async fn for_test(
371 input: Executor,
372 store: S,
373 table_id: TableId,
374 keys: Vec<ColumnOrder>,
375 column_ids: Vec<ColumnId>,
376 watermark_epoch: AtomicU64Ref,
377 conflict_behavior: ConflictBehavior,
378 ) -> Self {
379 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
380 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
381 let schema = input.schema().clone();
382 let columns: Vec<ColumnDesc> = column_ids
383 .into_iter()
384 .zip_eq_fast(schema.fields.iter())
385 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
386 .collect_vec();
387
388 let row_serde = BasicSerde::new(
389 Arc::from((0..columns.len()).collect_vec()),
390 Arc::from(columns.clone().into_boxed_slice()),
391 );
392 let state_table = StateTableInner::from_table_catalog(
393 &gen_pbtable(
394 table_id,
395 columns,
396 arrange_order_types,
397 arrange_columns.clone(),
398 0,
399 ),
400 store,
401 None,
402 )
403 .await;
404
405 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
406
407 Self {
408 input,
409 schema,
410 state_table,
411 arrange_key_indices: arrange_columns.clone(),
412 actor_context: ActorContext::for_test(0),
413 materialize_cache: MaterializeCache::new(
414 watermark_epoch,
415 MetricsInfo::for_test(),
416 row_serde,
417 None,
418 ),
419 conflict_behavior,
420 version_column_index: None,
421 is_dummy_table: false,
422 may_have_downstream: true,
423 depended_subscription_ids: HashSet::new(),
424 metrics,
425 }
426 }
427}
428
429fn generate_output(
431 change_buffer: ChangeBuffer,
432 data_types: Vec<DataType>,
433) -> StreamExecutorResult<Option<StreamChunk>> {
434 let mut new_ops: Vec<Op> = vec![];
437 let mut new_rows: Vec<Bytes> = vec![];
438 let row_deserializer = RowDeserializer::new(data_types.clone());
439 for (_, row_op) in change_buffer.into_parts() {
440 match row_op {
441 KeyOp::Insert(value) => {
442 new_ops.push(Op::Insert);
443 new_rows.push(value);
444 }
445 KeyOp::Delete(old_value) => {
446 new_ops.push(Op::Delete);
447 new_rows.push(old_value);
448 }
449 KeyOp::Update((old_value, new_value)) => {
450 if old_value != new_value {
452 new_ops.push(Op::UpdateDelete);
453 new_ops.push(Op::UpdateInsert);
454 new_rows.push(old_value);
455 new_rows.push(new_value);
456 }
457 }
458 }
459 }
460 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
461
462 for row_bytes in new_rows {
463 let res =
464 data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
465 debug_assert!(res.is_none());
466 }
467
468 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
469 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
470 Ok(Some(new_stream_chunk))
471 } else {
472 Ok(None)
473 }
474}
475
476struct ChangeBuffer {
479 buffer: HashMap<Vec<u8>, KeyOp>,
480}
481
482impl ChangeBuffer {
483 fn new() -> Self {
484 Self {
485 buffer: HashMap::new(),
486 }
487 }
488
489 fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
490 let entry = self.buffer.entry(pk);
491 match entry {
492 Entry::Vacant(e) => {
493 e.insert(KeyOp::Insert(value));
494 }
495 Entry::Occupied(mut e) => {
496 if let KeyOp::Delete(old_value) = e.get_mut() {
497 let old_val = std::mem::take(old_value);
498 e.insert(KeyOp::Update((old_val, value)));
499 } else {
500 unreachable!();
501 }
502 }
503 }
504 }
505
506 fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
507 let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
508 match entry {
509 Entry::Vacant(e) => {
510 e.insert(KeyOp::Delete(old_value));
511 }
512 Entry::Occupied(mut e) => match e.get_mut() {
513 KeyOp::Insert(_) => {
514 e.remove();
515 }
516 KeyOp::Update((prev, _curr)) => {
517 let prev = std::mem::take(prev);
518 e.insert(KeyOp::Delete(prev));
519 }
520 KeyOp::Delete(_) => {
521 unreachable!();
522 }
523 },
524 }
525 }
526
527 fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
528 let entry = self.buffer.entry(pk);
529 match entry {
530 Entry::Vacant(e) => {
531 e.insert(KeyOp::Update((old_value, new_value)));
532 }
533 Entry::Occupied(mut e) => match e.get_mut() {
534 KeyOp::Insert(_) => {
535 e.insert(KeyOp::Insert(new_value));
536 }
537 KeyOp::Update((_prev, curr)) => {
538 *curr = new_value;
539 }
540 KeyOp::Delete(_) => {
541 unreachable!()
542 }
543 },
544 }
545 }
546
547 fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
548 self.buffer
549 }
550}
551impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
552 fn execute(self: Box<Self>) -> BoxedMessageStream {
553 self.execute_inner().boxed()
554 }
555}
556
557impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
558 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559 f.debug_struct("MaterializeExecutor")
560 .field("arrange_key_indices", &self.arrange_key_indices)
561 .finish()
562 }
563}
564
565struct MaterializeCache<SD> {
567 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
568 row_serde: BasicSerde,
569 version_column_index: Option<u32>,
570 _serde: PhantomData<SD>,
571}
572
573type CacheValue = Option<CompactedRow>;
574
575impl<SD: ValueRowSerde> MaterializeCache<SD> {
576 fn new(
577 watermark_sequence: AtomicU64Ref,
578 metrics_info: MetricsInfo,
579 row_serde: BasicSerde,
580 version_column_index: Option<u32>,
581 ) -> Self {
582 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
583 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
584 Self {
585 lru_cache,
586 row_serde,
587 version_column_index,
588 _serde: PhantomData,
589 }
590 }
591
592 async fn handle<S: StateStore>(
593 &mut self,
594 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
595 table: &StateTableInner<S, SD>,
596 conflict_behavior: ConflictBehavior,
597 metrics: &MaterializeMetrics,
598 ) -> StreamExecutorResult<ChangeBuffer> {
599 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
600
601 let key_set: HashSet<Box<[u8]>> = row_ops
602 .iter()
603 .map(|(_, k, _)| k.as_slice().into())
604 .collect();
605
606 self.fetch_keys(
609 key_set.iter().map(|v| v.deref()),
610 table,
611 conflict_behavior,
612 metrics,
613 )
614 .await?;
615
616 let mut change_buffer = ChangeBuffer::new();
617 let row_serde = self.row_serde.clone();
618 let version_column_index = self.version_column_index;
619 for (op, key, row) in row_ops {
620 match op {
621 Op::Insert | Op::UpdateInsert => {
622 let Some(old_row) = self.get_expected(&key) else {
623 change_buffer.insert(key.clone(), row.clone());
625 self.lru_cache.put(key, Some(CompactedRow { row }));
626 continue;
627 };
628
629 match conflict_behavior {
631 ConflictBehavior::Overwrite => {
632 let need_overwrite = if let Some(idx) = version_column_index {
633 let old_row_deserialized =
634 row_serde.deserializer.deserialize(old_row.row.clone())?;
635 let new_row_deserialized =
636 row_serde.deserializer.deserialize(row.clone())?;
637
638 version_is_newer_or_equal(
639 old_row_deserialized.index(idx as usize),
640 new_row_deserialized.index(idx as usize),
641 )
642 } else {
643 true
645 };
646 if need_overwrite {
647 change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
648 self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
649 };
650 }
651 ConflictBehavior::IgnoreConflict => {
652 }
654 ConflictBehavior::DoUpdateIfNotNull => {
655 let old_row_deserialized =
659 row_serde.deserializer.deserialize(old_row.row.clone())?;
660 let new_row_deserialized =
661 row_serde.deserializer.deserialize(row.clone())?;
662 let need_overwrite = if let Some(idx) = version_column_index {
663 version_is_newer_or_equal(
664 old_row_deserialized.index(idx as usize),
665 new_row_deserialized.index(idx as usize),
666 )
667 } else {
668 true
669 };
670
671 if need_overwrite {
672 let mut row_deserialized_vec =
673 old_row_deserialized.into_inner().into_vec();
674 replace_if_not_null(
675 &mut row_deserialized_vec,
676 new_row_deserialized,
677 );
678 let updated_row = OwnedRow::new(row_deserialized_vec);
679 let updated_row_bytes =
680 Bytes::from(row_serde.serializer.serialize(updated_row));
681
682 change_buffer.update(
683 key.clone(),
684 old_row.row.clone(),
685 updated_row_bytes.clone(),
686 );
687 self.lru_cache.put(
688 key.clone(),
689 Some(CompactedRow {
690 row: updated_row_bytes,
691 }),
692 );
693 }
694 }
695 _ => unreachable!(),
696 };
697 }
698
699 Op::Delete | Op::UpdateDelete => {
700 match conflict_behavior {
701 checked_conflict_behaviors!() => {
702 if let Some(old_row) = self.get_expected(&key) {
703 change_buffer.delete(key.clone(), old_row.row.clone());
704 self.lru_cache.put(key, None);
706 } else {
707 };
710 }
711 _ => unreachable!(),
712 };
713 }
714 }
715 }
716 Ok(change_buffer)
717 }
718
719 async fn fetch_keys<'a, S: StateStore>(
720 &mut self,
721 keys: impl Iterator<Item = &'a [u8]>,
722 table: &StateTableInner<S, SD>,
723 conflict_behavior: ConflictBehavior,
724 metrics: &MaterializeMetrics,
725 ) -> StreamExecutorResult<()> {
726 let mut futures = vec![];
727 for key in keys {
728 metrics.materialize_cache_total_count.inc();
729
730 if self.lru_cache.contains(key) {
731 metrics.materialize_cache_hit_count.inc();
732 continue;
733 }
734 futures.push(async {
735 let key_row = table.pk_serde().deserialize(key).unwrap();
736 let row = table.get_row(key_row).await?.map(CompactedRow::from);
737 StreamExecutorResult::Ok((key.to_vec(), row))
738 });
739 }
740
741 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
742 while let Some(result) = buffered.next().await {
743 let (key, row) = result?;
744 match conflict_behavior {
746 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
747 _ => unreachable!(),
748 };
749 }
750
751 Ok(())
752 }
753
754 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
755 self.lru_cache.get(key).unwrap_or_else(|| {
756 panic!(
757 "the key {:?} has not been fetched in the materialize executor's cache ",
758 key
759 )
760 })
761 }
762
763 fn evict(&mut self) {
764 self.lru_cache.evict()
765 }
766}
767
768fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
781 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
782 if let Some(new_value) = new_col {
783 *old_col = Some(new_value);
784 }
785 }
786}
787
788fn version_is_newer_or_equal(
791 old_version: &Option<ScalarImpl>,
792 new_version: &Option<ScalarImpl>,
793) -> bool {
794 cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
795}
796
797#[cfg(test)]
798mod tests {
799
800 use std::iter;
801 use std::sync::atomic::AtomicU64;
802
803 use rand::rngs::SmallRng;
804 use rand::{Rng, RngCore, SeedableRng};
805 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
806 use risingwave_common::catalog::Field;
807 use risingwave_common::util::epoch::test_epoch;
808 use risingwave_common::util::sort_util::OrderType;
809 use risingwave_hummock_sdk::HummockReadEpoch;
810 use risingwave_storage::memory::MemoryStateStore;
811 use risingwave_storage::table::batch_table::BatchTable;
812
813 use super::*;
814 use crate::executor::test_utils::*;
815
816 #[tokio::test]
817 async fn test_materialize_executor() {
818 let memory_state_store = MemoryStateStore::new();
820 let table_id = TableId::new(1);
821 let schema = Schema::new(vec![
823 Field::unnamed(DataType::Int32),
824 Field::unnamed(DataType::Int32),
825 ]);
826 let column_ids = vec![0.into(), 1.into()];
827
828 let chunk1 = StreamChunk::from_pretty(
830 " i i
831 + 1 4
832 + 2 5
833 + 3 6",
834 );
835 let chunk2 = StreamChunk::from_pretty(
836 " i i
837 + 7 8
838 - 3 6",
839 );
840
841 let source = MockSource::with_messages(vec![
843 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
844 Message::Chunk(chunk1),
845 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
846 Message::Chunk(chunk2),
847 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
848 ])
849 .into_executor(schema.clone(), PkIndices::new());
850
851 let order_types = vec![OrderType::ascending()];
852 let column_descs = vec![
853 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
854 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
855 ];
856
857 let table = BatchTable::for_test(
858 memory_state_store.clone(),
859 table_id,
860 column_descs,
861 order_types,
862 vec![0],
863 vec![0, 1],
864 );
865
866 let mut materialize_executor = MaterializeExecutor::for_test(
867 source,
868 memory_state_store,
869 table_id,
870 vec![ColumnOrder::new(0, OrderType::ascending())],
871 column_ids,
872 Arc::new(AtomicU64::new(0)),
873 ConflictBehavior::NoCheck,
874 )
875 .await
876 .boxed()
877 .execute();
878 materialize_executor.next().await.transpose().unwrap();
879
880 materialize_executor.next().await.transpose().unwrap();
881
882 match materialize_executor.next().await.transpose().unwrap() {
884 Some(Message::Barrier(_)) => {
885 let row = table
886 .get_row(
887 &OwnedRow::new(vec![Some(3_i32.into())]),
888 HummockReadEpoch::NoWait(u64::MAX),
889 )
890 .await
891 .unwrap();
892 assert_eq!(
893 row,
894 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
895 );
896 }
897 _ => unreachable!(),
898 }
899 materialize_executor.next().await.transpose().unwrap();
900 match materialize_executor.next().await.transpose().unwrap() {
902 Some(Message::Barrier(_)) => {
903 let row = table
904 .get_row(
905 &OwnedRow::new(vec![Some(7_i32.into())]),
906 HummockReadEpoch::NoWait(u64::MAX),
907 )
908 .await
909 .unwrap();
910 assert_eq!(
911 row,
912 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
913 );
914 }
915 _ => unreachable!(),
916 }
917 }
918
919 #[tokio::test]
921 async fn test_upsert_stream() {
922 let memory_state_store = MemoryStateStore::new();
924 let table_id = TableId::new(1);
925 let schema = Schema::new(vec![
927 Field::unnamed(DataType::Int32),
928 Field::unnamed(DataType::Int32),
929 ]);
930 let column_ids = vec![0.into(), 1.into()];
931
932 let chunk1 = StreamChunk::from_pretty(
934 " i i
935 + 1 1",
936 );
937
938 let chunk2 = StreamChunk::from_pretty(
939 " i i
940 + 1 2
941 - 1 2",
942 );
943
944 let source = MockSource::with_messages(vec![
946 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
947 Message::Chunk(chunk1),
948 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
949 Message::Chunk(chunk2),
950 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
951 ])
952 .into_executor(schema.clone(), PkIndices::new());
953
954 let order_types = vec![OrderType::ascending()];
955 let column_descs = vec![
956 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
957 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
958 ];
959
960 let table = BatchTable::for_test(
961 memory_state_store.clone(),
962 table_id,
963 column_descs,
964 order_types,
965 vec![0],
966 vec![0, 1],
967 );
968
969 let mut materialize_executor = MaterializeExecutor::for_test(
970 source,
971 memory_state_store,
972 table_id,
973 vec![ColumnOrder::new(0, OrderType::ascending())],
974 column_ids,
975 Arc::new(AtomicU64::new(0)),
976 ConflictBehavior::Overwrite,
977 )
978 .await
979 .boxed()
980 .execute();
981 materialize_executor.next().await.transpose().unwrap();
982
983 materialize_executor.next().await.transpose().unwrap();
984 materialize_executor.next().await.transpose().unwrap();
985 materialize_executor.next().await.transpose().unwrap();
986
987 match materialize_executor.next().await.transpose().unwrap() {
988 Some(Message::Barrier(_)) => {
989 let row = table
990 .get_row(
991 &OwnedRow::new(vec![Some(1_i32.into())]),
992 HummockReadEpoch::NoWait(u64::MAX),
993 )
994 .await
995 .unwrap();
996 assert!(row.is_none());
997 }
998 _ => unreachable!(),
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_check_insert_conflict() {
1004 let memory_state_store = MemoryStateStore::new();
1006 let table_id = TableId::new(1);
1007 let schema = Schema::new(vec![
1009 Field::unnamed(DataType::Int32),
1010 Field::unnamed(DataType::Int32),
1011 ]);
1012 let column_ids = vec![0.into(), 1.into()];
1013
1014 let chunk1 = StreamChunk::from_pretty(
1016 " i i
1017 + 1 3
1018 + 1 4
1019 + 2 5
1020 + 3 6",
1021 );
1022
1023 let chunk2 = StreamChunk::from_pretty(
1024 " i i
1025 + 1 3
1026 + 2 6",
1027 );
1028
1029 let chunk3 = StreamChunk::from_pretty(
1031 " i i
1032 + 1 4",
1033 );
1034
1035 let source = MockSource::with_messages(vec![
1037 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1038 Message::Chunk(chunk1),
1039 Message::Chunk(chunk2),
1040 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1041 Message::Chunk(chunk3),
1042 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1043 ])
1044 .into_executor(schema.clone(), PkIndices::new());
1045
1046 let order_types = vec![OrderType::ascending()];
1047 let column_descs = vec![
1048 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1049 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1050 ];
1051
1052 let table = BatchTable::for_test(
1053 memory_state_store.clone(),
1054 table_id,
1055 column_descs,
1056 order_types,
1057 vec![0],
1058 vec![0, 1],
1059 );
1060
1061 let mut materialize_executor = MaterializeExecutor::for_test(
1062 source,
1063 memory_state_store,
1064 table_id,
1065 vec![ColumnOrder::new(0, OrderType::ascending())],
1066 column_ids,
1067 Arc::new(AtomicU64::new(0)),
1068 ConflictBehavior::Overwrite,
1069 )
1070 .await
1071 .boxed()
1072 .execute();
1073 materialize_executor.next().await.transpose().unwrap();
1074
1075 materialize_executor.next().await.transpose().unwrap();
1076 materialize_executor.next().await.transpose().unwrap();
1077
1078 match materialize_executor.next().await.transpose().unwrap() {
1080 Some(Message::Barrier(_)) => {
1081 let row = table
1082 .get_row(
1083 &OwnedRow::new(vec![Some(3_i32.into())]),
1084 HummockReadEpoch::NoWait(u64::MAX),
1085 )
1086 .await
1087 .unwrap();
1088 assert_eq!(
1089 row,
1090 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1091 );
1092
1093 let row = table
1094 .get_row(
1095 &OwnedRow::new(vec![Some(1_i32.into())]),
1096 HummockReadEpoch::NoWait(u64::MAX),
1097 )
1098 .await
1099 .unwrap();
1100 assert_eq!(
1101 row,
1102 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1103 );
1104
1105 let row = table
1106 .get_row(
1107 &OwnedRow::new(vec![Some(2_i32.into())]),
1108 HummockReadEpoch::NoWait(u64::MAX),
1109 )
1110 .await
1111 .unwrap();
1112 assert_eq!(
1113 row,
1114 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1115 );
1116 }
1117 _ => unreachable!(),
1118 }
1119 }
1120
1121 #[tokio::test]
1122 async fn test_delete_and_update_conflict() {
1123 let memory_state_store = MemoryStateStore::new();
1125 let table_id = TableId::new(1);
1126 let schema = Schema::new(vec![
1128 Field::unnamed(DataType::Int32),
1129 Field::unnamed(DataType::Int32),
1130 ]);
1131 let column_ids = vec![0.into(), 1.into()];
1132
1133 let chunk1 = StreamChunk::from_pretty(
1135 " i i
1136 + 1 4
1137 + 2 5
1138 + 3 6
1139 U- 8 1
1140 U+ 8 2
1141 + 8 3",
1142 );
1143
1144 let chunk2 = StreamChunk::from_pretty(
1146 " i i
1147 + 7 8
1148 - 3 4
1149 - 5 0",
1150 );
1151
1152 let chunk3 = StreamChunk::from_pretty(
1154 " i i
1155 + 1 5
1156 U- 2 4
1157 U+ 2 8
1158 U- 9 0
1159 U+ 9 1",
1160 );
1161
1162 let source = MockSource::with_messages(vec![
1164 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1165 Message::Chunk(chunk1),
1166 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1167 Message::Chunk(chunk2),
1168 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1169 Message::Chunk(chunk3),
1170 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1171 ])
1172 .into_executor(schema.clone(), PkIndices::new());
1173
1174 let order_types = vec![OrderType::ascending()];
1175 let column_descs = vec![
1176 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1177 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1178 ];
1179
1180 let table = BatchTable::for_test(
1181 memory_state_store.clone(),
1182 table_id,
1183 column_descs,
1184 order_types,
1185 vec![0],
1186 vec![0, 1],
1187 );
1188
1189 let mut materialize_executor = MaterializeExecutor::for_test(
1190 source,
1191 memory_state_store,
1192 table_id,
1193 vec![ColumnOrder::new(0, OrderType::ascending())],
1194 column_ids,
1195 Arc::new(AtomicU64::new(0)),
1196 ConflictBehavior::Overwrite,
1197 )
1198 .await
1199 .boxed()
1200 .execute();
1201 materialize_executor.next().await.transpose().unwrap();
1202
1203 materialize_executor.next().await.transpose().unwrap();
1204
1205 match materialize_executor.next().await.transpose().unwrap() {
1207 Some(Message::Barrier(_)) => {
1208 let row = table
1210 .get_row(
1211 &OwnedRow::new(vec![Some(8_i32.into())]),
1212 HummockReadEpoch::NoWait(u64::MAX),
1213 )
1214 .await
1215 .unwrap();
1216 assert_eq!(
1217 row,
1218 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1219 );
1220 }
1221 _ => unreachable!(),
1222 }
1223 materialize_executor.next().await.transpose().unwrap();
1224
1225 match materialize_executor.next().await.transpose().unwrap() {
1226 Some(Message::Barrier(_)) => {
1227 let row = table
1228 .get_row(
1229 &OwnedRow::new(vec![Some(7_i32.into())]),
1230 HummockReadEpoch::NoWait(u64::MAX),
1231 )
1232 .await
1233 .unwrap();
1234 assert_eq!(
1235 row,
1236 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1237 );
1238
1239 let row = table
1241 .get_row(
1242 &OwnedRow::new(vec![Some(3_i32.into())]),
1243 HummockReadEpoch::NoWait(u64::MAX),
1244 )
1245 .await
1246 .unwrap();
1247 assert_eq!(row, None);
1248
1249 let row = table
1251 .get_row(
1252 &OwnedRow::new(vec![Some(5_i32.into())]),
1253 HummockReadEpoch::NoWait(u64::MAX),
1254 )
1255 .await
1256 .unwrap();
1257 assert_eq!(row, None);
1258 }
1259 _ => unreachable!(),
1260 }
1261
1262 materialize_executor.next().await.transpose().unwrap();
1263 match materialize_executor.next().await.transpose().unwrap() {
1265 Some(Message::Barrier(_)) => {
1266 let row = table
1267 .get_row(
1268 &OwnedRow::new(vec![Some(1_i32.into())]),
1269 HummockReadEpoch::NoWait(u64::MAX),
1270 )
1271 .await
1272 .unwrap();
1273 assert_eq!(
1274 row,
1275 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1276 );
1277
1278 let row = table
1280 .get_row(
1281 &OwnedRow::new(vec![Some(2_i32.into())]),
1282 HummockReadEpoch::NoWait(u64::MAX),
1283 )
1284 .await
1285 .unwrap();
1286 assert_eq!(
1287 row,
1288 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1289 );
1290
1291 let row = table
1293 .get_row(
1294 &OwnedRow::new(vec![Some(9_i32.into())]),
1295 HummockReadEpoch::NoWait(u64::MAX),
1296 )
1297 .await
1298 .unwrap();
1299 assert_eq!(
1300 row,
1301 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1302 );
1303 }
1304 _ => unreachable!(),
1305 }
1306 }
1307
1308 #[tokio::test]
1309 async fn test_ignore_insert_conflict() {
1310 let memory_state_store = MemoryStateStore::new();
1312 let table_id = TableId::new(1);
1313 let schema = Schema::new(vec![
1315 Field::unnamed(DataType::Int32),
1316 Field::unnamed(DataType::Int32),
1317 ]);
1318 let column_ids = vec![0.into(), 1.into()];
1319
1320 let chunk1 = StreamChunk::from_pretty(
1322 " i i
1323 + 1 3
1324 + 1 4
1325 + 2 5
1326 + 3 6",
1327 );
1328
1329 let chunk2 = StreamChunk::from_pretty(
1330 " i i
1331 + 1 5
1332 + 2 6",
1333 );
1334
1335 let chunk3 = StreamChunk::from_pretty(
1337 " i i
1338 + 1 6",
1339 );
1340
1341 let source = MockSource::with_messages(vec![
1343 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1344 Message::Chunk(chunk1),
1345 Message::Chunk(chunk2),
1346 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1347 Message::Chunk(chunk3),
1348 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1349 ])
1350 .into_executor(schema.clone(), PkIndices::new());
1351
1352 let order_types = vec![OrderType::ascending()];
1353 let column_descs = vec![
1354 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1355 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1356 ];
1357
1358 let table = BatchTable::for_test(
1359 memory_state_store.clone(),
1360 table_id,
1361 column_descs,
1362 order_types,
1363 vec![0],
1364 vec![0, 1],
1365 );
1366
1367 let mut materialize_executor = MaterializeExecutor::for_test(
1368 source,
1369 memory_state_store,
1370 table_id,
1371 vec![ColumnOrder::new(0, OrderType::ascending())],
1372 column_ids,
1373 Arc::new(AtomicU64::new(0)),
1374 ConflictBehavior::IgnoreConflict,
1375 )
1376 .await
1377 .boxed()
1378 .execute();
1379 materialize_executor.next().await.transpose().unwrap();
1380
1381 materialize_executor.next().await.transpose().unwrap();
1382 materialize_executor.next().await.transpose().unwrap();
1383
1384 match materialize_executor.next().await.transpose().unwrap() {
1386 Some(Message::Barrier(_)) => {
1387 let row = table
1388 .get_row(
1389 &OwnedRow::new(vec![Some(3_i32.into())]),
1390 HummockReadEpoch::NoWait(u64::MAX),
1391 )
1392 .await
1393 .unwrap();
1394 assert_eq!(
1395 row,
1396 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1397 );
1398
1399 let row = table
1400 .get_row(
1401 &OwnedRow::new(vec![Some(1_i32.into())]),
1402 HummockReadEpoch::NoWait(u64::MAX),
1403 )
1404 .await
1405 .unwrap();
1406 assert_eq!(
1407 row,
1408 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1409 );
1410
1411 let row = table
1412 .get_row(
1413 &OwnedRow::new(vec![Some(2_i32.into())]),
1414 HummockReadEpoch::NoWait(u64::MAX),
1415 )
1416 .await
1417 .unwrap();
1418 assert_eq!(
1419 row,
1420 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1421 );
1422 }
1423 _ => unreachable!(),
1424 }
1425 }
1426
1427 #[tokio::test]
1428 async fn test_ignore_delete_then_insert() {
1429 let memory_state_store = MemoryStateStore::new();
1431 let table_id = TableId::new(1);
1432 let schema = Schema::new(vec![
1434 Field::unnamed(DataType::Int32),
1435 Field::unnamed(DataType::Int32),
1436 ]);
1437 let column_ids = vec![0.into(), 1.into()];
1438
1439 let chunk1 = StreamChunk::from_pretty(
1441 " i i
1442 + 1 3
1443 - 1 3
1444 + 1 6",
1445 );
1446
1447 let source = MockSource::with_messages(vec![
1449 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1450 Message::Chunk(chunk1),
1451 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1452 ])
1453 .into_executor(schema.clone(), PkIndices::new());
1454
1455 let order_types = vec![OrderType::ascending()];
1456 let column_descs = vec![
1457 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1458 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1459 ];
1460
1461 let table = BatchTable::for_test(
1462 memory_state_store.clone(),
1463 table_id,
1464 column_descs,
1465 order_types,
1466 vec![0],
1467 vec![0, 1],
1468 );
1469
1470 let mut materialize_executor = MaterializeExecutor::for_test(
1471 source,
1472 memory_state_store,
1473 table_id,
1474 vec![ColumnOrder::new(0, OrderType::ascending())],
1475 column_ids,
1476 Arc::new(AtomicU64::new(0)),
1477 ConflictBehavior::IgnoreConflict,
1478 )
1479 .await
1480 .boxed()
1481 .execute();
1482 let _msg1 = materialize_executor
1483 .next()
1484 .await
1485 .transpose()
1486 .unwrap()
1487 .unwrap()
1488 .as_barrier()
1489 .unwrap();
1490 let _msg2 = materialize_executor
1491 .next()
1492 .await
1493 .transpose()
1494 .unwrap()
1495 .unwrap()
1496 .as_chunk()
1497 .unwrap();
1498 let _msg3 = materialize_executor
1499 .next()
1500 .await
1501 .transpose()
1502 .unwrap()
1503 .unwrap()
1504 .as_barrier()
1505 .unwrap();
1506
1507 let row = table
1508 .get_row(
1509 &OwnedRow::new(vec![Some(1_i32.into())]),
1510 HummockReadEpoch::NoWait(u64::MAX),
1511 )
1512 .await
1513 .unwrap();
1514 assert_eq!(
1515 row,
1516 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1517 );
1518 }
1519
1520 #[tokio::test]
1521 async fn test_ignore_delete_and_update_conflict() {
1522 let memory_state_store = MemoryStateStore::new();
1524 let table_id = TableId::new(1);
1525 let schema = Schema::new(vec![
1527 Field::unnamed(DataType::Int32),
1528 Field::unnamed(DataType::Int32),
1529 ]);
1530 let column_ids = vec![0.into(), 1.into()];
1531
1532 let chunk1 = StreamChunk::from_pretty(
1534 " i i
1535 + 1 4
1536 + 2 5
1537 + 3 6
1538 U- 8 1
1539 U+ 8 2
1540 + 8 3",
1541 );
1542
1543 let chunk2 = StreamChunk::from_pretty(
1545 " i i
1546 + 7 8
1547 - 3 4
1548 - 5 0",
1549 );
1550
1551 let chunk3 = StreamChunk::from_pretty(
1553 " i i
1554 + 1 5
1555 U- 2 4
1556 U+ 2 8
1557 U- 9 0
1558 U+ 9 1",
1559 );
1560
1561 let source = MockSource::with_messages(vec![
1563 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1564 Message::Chunk(chunk1),
1565 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1566 Message::Chunk(chunk2),
1567 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1568 Message::Chunk(chunk3),
1569 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1570 ])
1571 .into_executor(schema.clone(), PkIndices::new());
1572
1573 let order_types = vec![OrderType::ascending()];
1574 let column_descs = vec![
1575 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1576 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1577 ];
1578
1579 let table = BatchTable::for_test(
1580 memory_state_store.clone(),
1581 table_id,
1582 column_descs,
1583 order_types,
1584 vec![0],
1585 vec![0, 1],
1586 );
1587
1588 let mut materialize_executor = MaterializeExecutor::for_test(
1589 source,
1590 memory_state_store,
1591 table_id,
1592 vec![ColumnOrder::new(0, OrderType::ascending())],
1593 column_ids,
1594 Arc::new(AtomicU64::new(0)),
1595 ConflictBehavior::IgnoreConflict,
1596 )
1597 .await
1598 .boxed()
1599 .execute();
1600 materialize_executor.next().await.transpose().unwrap();
1601
1602 materialize_executor.next().await.transpose().unwrap();
1603
1604 match materialize_executor.next().await.transpose().unwrap() {
1606 Some(Message::Barrier(_)) => {
1607 let row = table
1609 .get_row(
1610 &OwnedRow::new(vec![Some(8_i32.into())]),
1611 HummockReadEpoch::NoWait(u64::MAX),
1612 )
1613 .await
1614 .unwrap();
1615 assert_eq!(
1616 row,
1617 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1618 );
1619 }
1620 _ => unreachable!(),
1621 }
1622 materialize_executor.next().await.transpose().unwrap();
1623
1624 match materialize_executor.next().await.transpose().unwrap() {
1625 Some(Message::Barrier(_)) => {
1626 let row = table
1627 .get_row(
1628 &OwnedRow::new(vec![Some(7_i32.into())]),
1629 HummockReadEpoch::NoWait(u64::MAX),
1630 )
1631 .await
1632 .unwrap();
1633 assert_eq!(
1634 row,
1635 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1636 );
1637
1638 let row = table
1640 .get_row(
1641 &OwnedRow::new(vec![Some(3_i32.into())]),
1642 HummockReadEpoch::NoWait(u64::MAX),
1643 )
1644 .await
1645 .unwrap();
1646 assert_eq!(row, None);
1647
1648 let row = table
1650 .get_row(
1651 &OwnedRow::new(vec![Some(5_i32.into())]),
1652 HummockReadEpoch::NoWait(u64::MAX),
1653 )
1654 .await
1655 .unwrap();
1656 assert_eq!(row, None);
1657 }
1658 _ => unreachable!(),
1659 }
1660
1661 materialize_executor.next().await.transpose().unwrap();
1662 match materialize_executor.next().await.transpose().unwrap() {
1665 Some(Message::Barrier(_)) => {
1666 let row = table
1667 .get_row(
1668 &OwnedRow::new(vec![Some(1_i32.into())]),
1669 HummockReadEpoch::NoWait(u64::MAX),
1670 )
1671 .await
1672 .unwrap();
1673 assert_eq!(
1674 row,
1675 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1676 );
1677
1678 let row = table
1680 .get_row(
1681 &OwnedRow::new(vec![Some(2_i32.into())]),
1682 HummockReadEpoch::NoWait(u64::MAX),
1683 )
1684 .await
1685 .unwrap();
1686 assert_eq!(
1687 row,
1688 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1689 );
1690
1691 let row = table
1693 .get_row(
1694 &OwnedRow::new(vec![Some(9_i32.into())]),
1695 HummockReadEpoch::NoWait(u64::MAX),
1696 )
1697 .await
1698 .unwrap();
1699 assert_eq!(
1700 row,
1701 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1702 );
1703 }
1704 _ => unreachable!(),
1705 }
1706 }
1707
1708 #[tokio::test]
1709 async fn test_do_update_if_not_null_conflict() {
1710 let memory_state_store = MemoryStateStore::new();
1712 let table_id = TableId::new(1);
1713 let schema = Schema::new(vec![
1715 Field::unnamed(DataType::Int32),
1716 Field::unnamed(DataType::Int32),
1717 ]);
1718 let column_ids = vec![0.into(), 1.into()];
1719
1720 let chunk1 = StreamChunk::from_pretty(
1722 " i i
1723 + 1 4
1724 + 2 .
1725 + 3 6
1726 U- 8 .
1727 U+ 8 2
1728 + 8 .",
1729 );
1730
1731 let chunk2 = StreamChunk::from_pretty(
1733 " i i
1734 + 7 8
1735 - 3 4
1736 - 5 0",
1737 );
1738
1739 let chunk3 = StreamChunk::from_pretty(
1741 " i i
1742 + 1 5
1743 + 7 .
1744 U- 2 4
1745 U+ 2 .
1746 U- 9 0
1747 U+ 9 1",
1748 );
1749
1750 let source = MockSource::with_messages(vec![
1752 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1753 Message::Chunk(chunk1),
1754 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1755 Message::Chunk(chunk2),
1756 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1757 Message::Chunk(chunk3),
1758 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1759 ])
1760 .into_executor(schema.clone(), PkIndices::new());
1761
1762 let order_types = vec![OrderType::ascending()];
1763 let column_descs = vec![
1764 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1765 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1766 ];
1767
1768 let table = BatchTable::for_test(
1769 memory_state_store.clone(),
1770 table_id,
1771 column_descs,
1772 order_types,
1773 vec![0],
1774 vec![0, 1],
1775 );
1776
1777 let mut materialize_executor = MaterializeExecutor::for_test(
1778 source,
1779 memory_state_store,
1780 table_id,
1781 vec![ColumnOrder::new(0, OrderType::ascending())],
1782 column_ids,
1783 Arc::new(AtomicU64::new(0)),
1784 ConflictBehavior::DoUpdateIfNotNull,
1785 )
1786 .await
1787 .boxed()
1788 .execute();
1789 materialize_executor.next().await.transpose().unwrap();
1790
1791 materialize_executor.next().await.transpose().unwrap();
1792
1793 match materialize_executor.next().await.transpose().unwrap() {
1795 Some(Message::Barrier(_)) => {
1796 let row = table
1797 .get_row(
1798 &OwnedRow::new(vec![Some(8_i32.into())]),
1799 HummockReadEpoch::NoWait(u64::MAX),
1800 )
1801 .await
1802 .unwrap();
1803 assert_eq!(
1804 row,
1805 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1806 );
1807
1808 let row = table
1809 .get_row(
1810 &OwnedRow::new(vec![Some(2_i32.into())]),
1811 HummockReadEpoch::NoWait(u64::MAX),
1812 )
1813 .await
1814 .unwrap();
1815 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1816 }
1817 _ => unreachable!(),
1818 }
1819 materialize_executor.next().await.transpose().unwrap();
1820
1821 match materialize_executor.next().await.transpose().unwrap() {
1822 Some(Message::Barrier(_)) => {
1823 let row = table
1824 .get_row(
1825 &OwnedRow::new(vec![Some(7_i32.into())]),
1826 HummockReadEpoch::NoWait(u64::MAX),
1827 )
1828 .await
1829 .unwrap();
1830 assert_eq!(
1831 row,
1832 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1833 );
1834
1835 let row = table
1837 .get_row(
1838 &OwnedRow::new(vec![Some(3_i32.into())]),
1839 HummockReadEpoch::NoWait(u64::MAX),
1840 )
1841 .await
1842 .unwrap();
1843 assert_eq!(row, None);
1844
1845 let row = table
1847 .get_row(
1848 &OwnedRow::new(vec![Some(5_i32.into())]),
1849 HummockReadEpoch::NoWait(u64::MAX),
1850 )
1851 .await
1852 .unwrap();
1853 assert_eq!(row, None);
1854 }
1855 _ => unreachable!(),
1856 }
1857
1858 materialize_executor.next().await.transpose().unwrap();
1859 match materialize_executor.next().await.transpose().unwrap() {
1862 Some(Message::Barrier(_)) => {
1863 let row = table
1864 .get_row(
1865 &OwnedRow::new(vec![Some(7_i32.into())]),
1866 HummockReadEpoch::NoWait(u64::MAX),
1867 )
1868 .await
1869 .unwrap();
1870 assert_eq!(
1871 row,
1872 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1873 );
1874
1875 let row = table
1877 .get_row(
1878 &OwnedRow::new(vec![Some(2_i32.into())]),
1879 HummockReadEpoch::NoWait(u64::MAX),
1880 )
1881 .await
1882 .unwrap();
1883 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1884
1885 let row = table
1887 .get_row(
1888 &OwnedRow::new(vec![Some(9_i32.into())]),
1889 HummockReadEpoch::NoWait(u64::MAX),
1890 )
1891 .await
1892 .unwrap();
1893 assert_eq!(
1894 row,
1895 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1896 );
1897 }
1898 _ => unreachable!(),
1899 }
1900 }
1901
1902 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1903 const KN: u32 = 4;
1904 const SEED: u64 = 998244353;
1905 let mut ret = vec![];
1906 let mut builder =
1907 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1908 let mut rng = SmallRng::seed_from_u64(SEED);
1909
1910 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1911 let len = c.data_chunk().capacity();
1912 let mut c = StreamChunkMut::from(c);
1913 for i in 0..len {
1914 c.set_vis(i, rng.random_bool(0.5));
1915 }
1916 c.into()
1917 };
1918 for _ in 0..row_number {
1919 let k = (rng.next_u32() % KN) as i32;
1920 let v = rng.next_u32() as i32;
1921 let op = if rng.random_bool(0.5) {
1922 Op::Insert
1923 } else {
1924 Op::Delete
1925 };
1926 if let Some(c) =
1927 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1928 {
1929 ret.push(random_vis(c, &mut rng));
1930 }
1931 }
1932 if let Some(c) = builder.take() {
1933 ret.push(random_vis(c, &mut rng));
1934 }
1935 ret
1936 }
1937
1938 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1939 const N: usize = 100000;
1940
1941 let memory_state_store = MemoryStateStore::new();
1943 let table_id = TableId::new(1);
1944 let schema = Schema::new(vec![
1946 Field::unnamed(DataType::Int32),
1947 Field::unnamed(DataType::Int32),
1948 ]);
1949 let column_ids = vec![0.into(), 1.into()];
1950
1951 let chunks = gen_fuzz_data(N, 128);
1952 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1953 .chain(chunks.into_iter().map(Message::Chunk))
1954 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1955 test_epoch(2),
1956 ))))
1957 .collect();
1958 let source =
1960 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1961
1962 let mut materialize_executor = MaterializeExecutor::for_test(
1963 source,
1964 memory_state_store.clone(),
1965 table_id,
1966 vec![ColumnOrder::new(0, OrderType::ascending())],
1967 column_ids,
1968 Arc::new(AtomicU64::new(0)),
1969 conflict_behavior,
1970 )
1971 .await
1972 .boxed()
1973 .execute();
1974 materialize_executor.expect_barrier().await;
1975
1976 let order_types = vec![OrderType::ascending()];
1977 let column_descs = vec![
1978 ColumnDesc::unnamed(0.into(), DataType::Int32),
1979 ColumnDesc::unnamed(1.into(), DataType::Int32),
1980 ];
1981 let pk_indices = vec![0];
1982
1983 let mut table = StateTable::from_table_catalog(
1984 &gen_pbtable(
1985 TableId::from(1002),
1986 column_descs.clone(),
1987 order_types,
1988 pk_indices,
1989 0,
1990 ),
1991 memory_state_store.clone(),
1992 None,
1993 )
1994 .await;
1995
1996 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
1997 table.write_chunk(c);
1999 }
2000 }
2001
2002 #[tokio::test]
2003 async fn fuzz_test_stream_consistent_upsert() {
2004 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2005 }
2006
2007 #[tokio::test]
2008 async fn fuzz_test_stream_consistent_ignore() {
2009 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2010 }
2011}