1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27 ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48 input: Executor,
49
50 schema: Schema,
51
52 state_table: StateTableInner<S, SD>,
53
54 arrange_key_indices: Vec<usize>,
56
57 actor_context: ActorContextRef,
58
59 materialize_cache: MaterializeCache<SD>,
60
61 conflict_behavior: ConflictBehavior,
62
63 version_column_index: Option<u32>,
64
65 may_have_downstream: bool,
66
67 depended_subscription_ids: HashSet<u32>,
68
69 metrics: MaterializeMetrics,
70
71 is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77 conflict_behavior: ConflictBehavior,
78 may_have_downstream: bool,
79 depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81 if !depended_subscriptions.is_empty() {
82 StateTableOpConsistencyLevel::LogStoreEnabled
83 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84 StateTableOpConsistencyLevel::Inconsistent
87 } else {
88 StateTableOpConsistencyLevel::ConsistentOldValue
89 }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93 #[allow(clippy::too_many_arguments)]
97 pub async fn new(
98 input: Executor,
99 schema: Schema,
100 store: S,
101 arrange_key: Vec<ColumnOrder>,
102 actor_context: ActorContextRef,
103 vnodes: Option<Arc<Bitmap>>,
104 table_catalog: &Table,
105 watermark_epoch: AtomicU64Ref,
106 conflict_behavior: ConflictBehavior,
107 version_column_index: Option<u32>,
108 metrics: Arc<StreamingMetrics>,
109 ) -> Self {
110 let table_columns: Vec<ColumnDesc> = table_catalog
111 .columns
112 .iter()
113 .map(|col| col.column_desc.as_ref().unwrap().into())
114 .collect();
115
116 let row_serde: BasicSerde = BasicSerde::new(
117 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118 Arc::from(table_columns.into_boxed_slice()),
119 );
120
121 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122 let may_have_downstream = actor_context.initial_dispatch_num != 0;
123 let depended_subscription_ids = actor_context
124 .related_subscriptions
125 .get(&TableId::new(table_catalog.id))
126 .cloned()
127 .unwrap_or_default();
128 let op_consistency_level = get_op_consistency_level(
129 conflict_behavior,
130 may_have_downstream,
131 &depended_subscription_ids,
132 );
133 let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135 table_catalog,
136 store,
137 vnodes,
138 op_consistency_level,
139 )
140 .await;
141
142 let mv_metrics = metrics.new_materialize_metrics(
143 TableId::new(table_catalog.id),
144 actor_context.id,
145 actor_context.fragment_id,
146 );
147
148 let metrics_info =
149 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151 let is_dummy_table =
152 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154 Self {
155 input,
156 schema,
157 state_table,
158 arrange_key_indices,
159 actor_context,
160 materialize_cache: MaterializeCache::new(
161 watermark_epoch,
162 metrics_info,
163 row_serde,
164 version_column_index,
165 ),
166 conflict_behavior,
167 version_column_index,
168 is_dummy_table,
169 may_have_downstream,
170 depended_subscription_ids,
171 metrics: mv_metrics,
172 }
173 }
174
175 #[try_stream(ok = Message, error = StreamExecutorError)]
176 async fn execute_inner(mut self) {
177 let mv_table_id = TableId::new(self.state_table.table_id());
178
179 let data_types = self.schema.data_types();
180 let mut input = self.input.execute();
181
182 let barrier = expect_first_barrier(&mut input).await?;
183 let first_epoch = barrier.epoch;
184 yield Message::Barrier(barrier);
186 self.state_table.init_epoch(first_epoch).await?;
187
188 #[for_await]
189 for msg in input {
190 let msg = msg?;
191 self.materialize_cache.evict();
192
193 let msg = match msg {
194 Message::Watermark(w) => Message::Watermark(w),
195 Message::Chunk(chunk) if self.is_dummy_table => {
196 self.metrics
197 .materialize_input_row_count
198 .inc_by(chunk.cardinality() as u64);
199 Message::Chunk(chunk)
200 }
201 Message::Chunk(chunk) => {
202 self.metrics
203 .materialize_input_row_count
204 .inc_by(chunk.cardinality() as u64);
205
206 let do_not_handle_conflict = !self.state_table.is_consistent_op()
210 && self.version_column_index.is_none()
211 && self.conflict_behavior == ConflictBehavior::Overwrite;
212 match self.conflict_behavior {
213 checked_conflict_behaviors!() if !do_not_handle_conflict => {
214 if chunk.cardinality() == 0 {
215 continue;
217 }
218 let (data_chunk, ops) = chunk.into_parts();
219
220 if self.state_table.value_indices().is_some() {
221 panic!(
224 "materialize executor with data check can not handle only materialize partial columns"
225 )
226 };
227 let values = data_chunk.serialize();
228
229 let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231 let pks = {
232 let mut pks = vec![vec![]; data_chunk.capacity()];
233 key_chunk
234 .rows_with_holes()
235 .zip_eq_fast(pks.iter_mut())
236 .for_each(|(r, vnode_and_pk)| {
237 if let Some(r) = r {
238 self.state_table.pk_serde().serialize(r, vnode_and_pk);
239 }
240 });
241 pks
242 };
243 let (_, vis) = key_chunk.into_parts();
244 let row_ops = ops
245 .iter()
246 .zip_eq_debug(pks.into_iter())
247 .zip_eq_debug(values.into_iter())
248 .zip_eq_debug(vis.iter())
249 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250 .collect_vec();
251
252 let change_buffer = self
253 .materialize_cache
254 .handle(
255 row_ops,
256 &self.state_table,
257 self.conflict_behavior,
258 &self.metrics,
259 )
260 .await?;
261
262 match generate_output(change_buffer, data_types.clone())? {
263 Some(output_chunk) => {
264 self.state_table.write_chunk(output_chunk.clone());
265 self.state_table.try_flush().await?;
266 Message::Chunk(output_chunk)
267 }
268 None => continue,
269 }
270 }
271 ConflictBehavior::IgnoreConflict => unreachable!(),
272 ConflictBehavior::NoCheck
273 | ConflictBehavior::Overwrite
274 | ConflictBehavior::DoUpdateIfNotNull => {
275 self.state_table.write_chunk(chunk.clone());
276 self.state_table.try_flush().await?;
277 Message::Chunk(chunk)
278 } }
280 }
281 Message::Barrier(b) => {
282 if !self.may_have_downstream
284 && b.has_more_downstream_fragments(self.actor_context.id)
285 {
286 self.may_have_downstream = true;
287 }
288 Self::may_update_depended_subscriptions(
289 &mut self.depended_subscription_ids,
290 &b,
291 mv_table_id,
292 );
293 let op_consistency_level = get_op_consistency_level(
294 self.conflict_behavior,
295 self.may_have_downstream,
296 &self.depended_subscription_ids,
297 );
298 let post_commit = self
299 .state_table
300 .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301 .await?;
302 if !post_commit.inner().is_consistent_op() {
303 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304 }
305
306 let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307 let b_epoch = b.epoch;
308 yield Message::Barrier(b);
309
310 if let Some((_, cache_may_stale)) =
312 post_commit.post_yield_barrier(update_vnode_bitmap).await?
313 && cache_may_stale
314 {
315 self.materialize_cache.lru_cache.clear();
316 }
317
318 self.metrics
319 .materialize_current_epoch
320 .set(b_epoch.curr as i64);
321
322 continue;
323 }
324 };
325 yield msg;
326 }
327 }
328
329 fn may_update_depended_subscriptions(
331 depended_subscriptions: &mut HashSet<u32>,
332 barrier: &Barrier,
333 mv_table_id: TableId,
334 ) {
335 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
336 if !depended_subscriptions.insert(subscriber_id) {
337 warn!(
338 ?depended_subscriptions,
339 ?mv_table_id,
340 subscriber_id,
341 "subscription id already exists"
342 );
343 }
344 }
345
346 if let Some(Mutation::DropSubscriptions {
347 subscriptions_to_drop,
348 }) = barrier.mutation.as_deref()
349 {
350 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
351 if *upstream_mv_table_id == mv_table_id
352 && !depended_subscriptions.remove(subscriber_id)
353 {
354 warn!(
355 ?depended_subscriptions,
356 ?mv_table_id,
357 subscriber_id,
358 "drop non existing subscriber_id id"
359 );
360 }
361 }
362 }
363 }
364}
365
366impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
367 #[allow(clippy::too_many_arguments)]
369 pub async fn for_test(
370 input: Executor,
371 store: S,
372 table_id: TableId,
373 keys: Vec<ColumnOrder>,
374 column_ids: Vec<ColumnId>,
375 watermark_epoch: AtomicU64Ref,
376 conflict_behavior: ConflictBehavior,
377 ) -> Self {
378 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
379 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
380 let schema = input.schema().clone();
381 let columns: Vec<ColumnDesc> = column_ids
382 .into_iter()
383 .zip_eq_fast(schema.fields.iter())
384 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
385 .collect_vec();
386
387 let row_serde = BasicSerde::new(
388 Arc::from((0..columns.len()).collect_vec()),
389 Arc::from(columns.clone().into_boxed_slice()),
390 );
391 let state_table = StateTableInner::from_table_catalog(
392 &gen_pbtable(
393 table_id,
394 columns,
395 arrange_order_types,
396 arrange_columns.clone(),
397 0,
398 ),
399 store,
400 None,
401 )
402 .await;
403
404 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
405
406 Self {
407 input,
408 schema,
409 state_table,
410 arrange_key_indices: arrange_columns.clone(),
411 actor_context: ActorContext::for_test(0),
412 materialize_cache: MaterializeCache::new(
413 watermark_epoch,
414 MetricsInfo::for_test(),
415 row_serde,
416 None,
417 ),
418 conflict_behavior,
419 version_column_index: None,
420 is_dummy_table: false,
421 may_have_downstream: true,
422 depended_subscription_ids: HashSet::new(),
423 metrics,
424 }
425 }
426}
427
428fn generate_output(
430 change_buffer: ChangeBuffer,
431 data_types: Vec<DataType>,
432) -> StreamExecutorResult<Option<StreamChunk>> {
433 let mut new_ops: Vec<Op> = vec![];
436 let mut new_rows: Vec<Bytes> = vec![];
437 let row_deserializer = RowDeserializer::new(data_types.clone());
438 for (_, row_op) in change_buffer.into_parts() {
439 match row_op {
440 KeyOp::Insert(value) => {
441 new_ops.push(Op::Insert);
442 new_rows.push(value);
443 }
444 KeyOp::Delete(old_value) => {
445 new_ops.push(Op::Delete);
446 new_rows.push(old_value);
447 }
448 KeyOp::Update((old_value, new_value)) => {
449 if old_value != new_value {
451 new_ops.push(Op::UpdateDelete);
452 new_ops.push(Op::UpdateInsert);
453 new_rows.push(old_value);
454 new_rows.push(new_value);
455 }
456 }
457 }
458 }
459 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
460
461 for row_bytes in new_rows {
462 let res =
463 data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
464 debug_assert!(res.is_none());
465 }
466
467 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
468 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
469 Ok(Some(new_stream_chunk))
470 } else {
471 Ok(None)
472 }
473}
474
475struct ChangeBuffer {
478 buffer: HashMap<Vec<u8>, KeyOp>,
479}
480
481impl ChangeBuffer {
482 fn new() -> Self {
483 Self {
484 buffer: HashMap::new(),
485 }
486 }
487
488 fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
489 let entry = self.buffer.entry(pk);
490 match entry {
491 Entry::Vacant(e) => {
492 e.insert(KeyOp::Insert(value));
493 }
494 Entry::Occupied(mut e) => {
495 if let KeyOp::Delete(old_value) = e.get_mut() {
496 let old_val = std::mem::take(old_value);
497 e.insert(KeyOp::Update((old_val, value)));
498 } else {
499 unreachable!();
500 }
501 }
502 }
503 }
504
505 fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
506 let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
507 match entry {
508 Entry::Vacant(e) => {
509 e.insert(KeyOp::Delete(old_value));
510 }
511 Entry::Occupied(mut e) => match e.get_mut() {
512 KeyOp::Insert(_) => {
513 e.remove();
514 }
515 KeyOp::Update((prev, _curr)) => {
516 let prev = std::mem::take(prev);
517 e.insert(KeyOp::Delete(prev));
518 }
519 KeyOp::Delete(_) => {
520 unreachable!();
521 }
522 },
523 }
524 }
525
526 fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
527 let entry = self.buffer.entry(pk);
528 match entry {
529 Entry::Vacant(e) => {
530 e.insert(KeyOp::Update((old_value, new_value)));
531 }
532 Entry::Occupied(mut e) => match e.get_mut() {
533 KeyOp::Insert(_) => {
534 e.insert(KeyOp::Insert(new_value));
535 }
536 KeyOp::Update((_prev, curr)) => {
537 *curr = new_value;
538 }
539 KeyOp::Delete(_) => {
540 unreachable!()
541 }
542 },
543 }
544 }
545
546 fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
547 self.buffer
548 }
549}
550impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
551 fn execute(self: Box<Self>) -> BoxedMessageStream {
552 self.execute_inner().boxed()
553 }
554}
555
556impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558 f.debug_struct("MaterializeExecutor")
559 .field("arrange_key_indices", &self.arrange_key_indices)
560 .finish()
561 }
562}
563
564struct MaterializeCache<SD> {
566 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
567 row_serde: BasicSerde,
568 version_column_index: Option<u32>,
569 _serde: PhantomData<SD>,
570}
571
572type CacheValue = Option<CompactedRow>;
573
574impl<SD: ValueRowSerde> MaterializeCache<SD> {
575 fn new(
576 watermark_sequence: AtomicU64Ref,
577 metrics_info: MetricsInfo,
578 row_serde: BasicSerde,
579 version_column_index: Option<u32>,
580 ) -> Self {
581 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
582 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
583 Self {
584 lru_cache,
585 row_serde,
586 version_column_index,
587 _serde: PhantomData,
588 }
589 }
590
591 async fn handle<S: StateStore>(
592 &mut self,
593 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
594 table: &StateTableInner<S, SD>,
595 conflict_behavior: ConflictBehavior,
596 metrics: &MaterializeMetrics,
597 ) -> StreamExecutorResult<ChangeBuffer> {
598 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
599
600 let key_set: HashSet<Box<[u8]>> = row_ops
601 .iter()
602 .map(|(_, k, _)| k.as_slice().into())
603 .collect();
604
605 self.fetch_keys(
608 key_set.iter().map(|v| v.deref()),
609 table,
610 conflict_behavior,
611 metrics,
612 )
613 .await?;
614
615 let mut change_buffer = ChangeBuffer::new();
616 let row_serde = self.row_serde.clone();
617 let version_column_index = self.version_column_index;
618 for (op, key, row) in row_ops {
619 match op {
620 Op::Insert | Op::UpdateInsert => {
621 let Some(old_row) = self.get_expected(&key) else {
622 change_buffer.insert(key.clone(), row.clone());
624 self.lru_cache.put(key, Some(CompactedRow { row }));
625 continue;
626 };
627
628 match conflict_behavior {
630 ConflictBehavior::Overwrite => {
631 let need_overwrite = if let Some(idx) = version_column_index {
632 let old_row_deserialized =
633 row_serde.deserializer.deserialize(old_row.row.clone())?;
634 let new_row_deserialized =
635 row_serde.deserializer.deserialize(row.clone())?;
636
637 version_is_newer_or_equal(
638 old_row_deserialized.index(idx as usize),
639 new_row_deserialized.index(idx as usize),
640 )
641 } else {
642 true
644 };
645 if need_overwrite {
646 change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
647 self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
648 };
649 }
650 ConflictBehavior::IgnoreConflict => {
651 }
653 ConflictBehavior::DoUpdateIfNotNull => {
654 let old_row_deserialized =
658 row_serde.deserializer.deserialize(old_row.row.clone())?;
659 let new_row_deserialized =
660 row_serde.deserializer.deserialize(row.clone())?;
661 let need_overwrite = if let Some(idx) = version_column_index {
662 version_is_newer_or_equal(
663 old_row_deserialized.index(idx as usize),
664 new_row_deserialized.index(idx as usize),
665 )
666 } else {
667 true
668 };
669
670 if need_overwrite {
671 let mut row_deserialized_vec =
672 old_row_deserialized.into_inner().into_vec();
673 replace_if_not_null(
674 &mut row_deserialized_vec,
675 new_row_deserialized,
676 );
677 let updated_row = OwnedRow::new(row_deserialized_vec);
678 let updated_row_bytes =
679 Bytes::from(row_serde.serializer.serialize(updated_row));
680
681 change_buffer.update(
682 key.clone(),
683 old_row.row.clone(),
684 updated_row_bytes.clone(),
685 );
686 self.lru_cache.put(
687 key.clone(),
688 Some(CompactedRow {
689 row: updated_row_bytes,
690 }),
691 );
692 }
693 }
694 _ => unreachable!(),
695 };
696 }
697
698 Op::Delete | Op::UpdateDelete => {
699 match conflict_behavior {
700 checked_conflict_behaviors!() => {
701 if let Some(old_row) = self.get_expected(&key) {
702 change_buffer.delete(key.clone(), old_row.row.clone());
703 self.lru_cache.put(key, None);
705 } else {
706 };
709 }
710 _ => unreachable!(),
711 };
712 }
713 }
714 }
715 Ok(change_buffer)
716 }
717
718 async fn fetch_keys<'a, S: StateStore>(
719 &mut self,
720 keys: impl Iterator<Item = &'a [u8]>,
721 table: &StateTableInner<S, SD>,
722 conflict_behavior: ConflictBehavior,
723 metrics: &MaterializeMetrics,
724 ) -> StreamExecutorResult<()> {
725 let mut futures = vec![];
726 for key in keys {
727 metrics.materialize_cache_total_count.inc();
728
729 if self.lru_cache.contains(key) {
730 if self.lru_cache.get(key).unwrap().is_some() {
731 metrics.materialize_data_exist_count.inc();
732 }
733 metrics.materialize_cache_hit_count.inc();
734 continue;
735 }
736 futures.push(async {
737 let key_row = table.pk_serde().deserialize(key).unwrap();
738 let row = table.get_row(key_row).await?.map(CompactedRow::from);
739 StreamExecutorResult::Ok((key.to_vec(), row))
740 });
741 }
742
743 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
744 while let Some(result) = buffered.next().await {
745 let (key, row) = result?;
746 if row.is_some() {
747 metrics.materialize_data_exist_count.inc();
748 }
749 match conflict_behavior {
751 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
752 _ => unreachable!(),
753 };
754 }
755
756 Ok(())
757 }
758
759 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
760 self.lru_cache.get(key).unwrap_or_else(|| {
761 panic!(
762 "the key {:?} has not been fetched in the materialize executor's cache ",
763 key
764 )
765 })
766 }
767
768 fn evict(&mut self) {
769 self.lru_cache.evict()
770 }
771}
772
773fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
786 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
787 if let Some(new_value) = new_col {
788 *old_col = Some(new_value);
789 }
790 }
791}
792
793fn version_is_newer_or_equal(
796 old_version: &Option<ScalarImpl>,
797 new_version: &Option<ScalarImpl>,
798) -> bool {
799 cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
800}
801
802#[cfg(test)]
803mod tests {
804
805 use std::iter;
806 use std::sync::atomic::AtomicU64;
807
808 use rand::rngs::SmallRng;
809 use rand::{Rng, RngCore, SeedableRng};
810 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
811 use risingwave_common::catalog::Field;
812 use risingwave_common::util::epoch::test_epoch;
813 use risingwave_common::util::sort_util::OrderType;
814 use risingwave_hummock_sdk::HummockReadEpoch;
815 use risingwave_storage::memory::MemoryStateStore;
816 use risingwave_storage::table::batch_table::BatchTable;
817
818 use super::*;
819 use crate::executor::test_utils::*;
820
821 #[tokio::test]
822 async fn test_materialize_executor() {
823 let memory_state_store = MemoryStateStore::new();
825 let table_id = TableId::new(1);
826 let schema = Schema::new(vec![
828 Field::unnamed(DataType::Int32),
829 Field::unnamed(DataType::Int32),
830 ]);
831 let column_ids = vec![0.into(), 1.into()];
832
833 let chunk1 = StreamChunk::from_pretty(
835 " i i
836 + 1 4
837 + 2 5
838 + 3 6",
839 );
840 let chunk2 = StreamChunk::from_pretty(
841 " i i
842 + 7 8
843 - 3 6",
844 );
845
846 let source = MockSource::with_messages(vec![
848 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
849 Message::Chunk(chunk1),
850 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
851 Message::Chunk(chunk2),
852 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
853 ])
854 .into_executor(schema.clone(), PkIndices::new());
855
856 let order_types = vec![OrderType::ascending()];
857 let column_descs = vec![
858 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
859 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
860 ];
861
862 let table = BatchTable::for_test(
863 memory_state_store.clone(),
864 table_id,
865 column_descs,
866 order_types,
867 vec![0],
868 vec![0, 1],
869 );
870
871 let mut materialize_executor = MaterializeExecutor::for_test(
872 source,
873 memory_state_store,
874 table_id,
875 vec![ColumnOrder::new(0, OrderType::ascending())],
876 column_ids,
877 Arc::new(AtomicU64::new(0)),
878 ConflictBehavior::NoCheck,
879 )
880 .await
881 .boxed()
882 .execute();
883 materialize_executor.next().await.transpose().unwrap();
884
885 materialize_executor.next().await.transpose().unwrap();
886
887 match materialize_executor.next().await.transpose().unwrap() {
889 Some(Message::Barrier(_)) => {
890 let row = table
891 .get_row(
892 &OwnedRow::new(vec![Some(3_i32.into())]),
893 HummockReadEpoch::NoWait(u64::MAX),
894 )
895 .await
896 .unwrap();
897 assert_eq!(
898 row,
899 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
900 );
901 }
902 _ => unreachable!(),
903 }
904 materialize_executor.next().await.transpose().unwrap();
905 match materialize_executor.next().await.transpose().unwrap() {
907 Some(Message::Barrier(_)) => {
908 let row = table
909 .get_row(
910 &OwnedRow::new(vec![Some(7_i32.into())]),
911 HummockReadEpoch::NoWait(u64::MAX),
912 )
913 .await
914 .unwrap();
915 assert_eq!(
916 row,
917 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
918 );
919 }
920 _ => unreachable!(),
921 }
922 }
923
924 #[tokio::test]
926 async fn test_upsert_stream() {
927 let memory_state_store = MemoryStateStore::new();
929 let table_id = TableId::new(1);
930 let schema = Schema::new(vec![
932 Field::unnamed(DataType::Int32),
933 Field::unnamed(DataType::Int32),
934 ]);
935 let column_ids = vec![0.into(), 1.into()];
936
937 let chunk1 = StreamChunk::from_pretty(
939 " i i
940 + 1 1",
941 );
942
943 let chunk2 = StreamChunk::from_pretty(
944 " i i
945 + 1 2
946 - 1 2",
947 );
948
949 let source = MockSource::with_messages(vec![
951 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
952 Message::Chunk(chunk1),
953 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
954 Message::Chunk(chunk2),
955 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
956 ])
957 .into_executor(schema.clone(), PkIndices::new());
958
959 let order_types = vec![OrderType::ascending()];
960 let column_descs = vec![
961 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
962 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
963 ];
964
965 let table = BatchTable::for_test(
966 memory_state_store.clone(),
967 table_id,
968 column_descs,
969 order_types,
970 vec![0],
971 vec![0, 1],
972 );
973
974 let mut materialize_executor = MaterializeExecutor::for_test(
975 source,
976 memory_state_store,
977 table_id,
978 vec![ColumnOrder::new(0, OrderType::ascending())],
979 column_ids,
980 Arc::new(AtomicU64::new(0)),
981 ConflictBehavior::Overwrite,
982 )
983 .await
984 .boxed()
985 .execute();
986 materialize_executor.next().await.transpose().unwrap();
987
988 materialize_executor.next().await.transpose().unwrap();
989 materialize_executor.next().await.transpose().unwrap();
990 materialize_executor.next().await.transpose().unwrap();
991
992 match materialize_executor.next().await.transpose().unwrap() {
993 Some(Message::Barrier(_)) => {
994 let row = table
995 .get_row(
996 &OwnedRow::new(vec![Some(1_i32.into())]),
997 HummockReadEpoch::NoWait(u64::MAX),
998 )
999 .await
1000 .unwrap();
1001 assert!(row.is_none());
1002 }
1003 _ => unreachable!(),
1004 }
1005 }
1006
1007 #[tokio::test]
1008 async fn test_check_insert_conflict() {
1009 let memory_state_store = MemoryStateStore::new();
1011 let table_id = TableId::new(1);
1012 let schema = Schema::new(vec![
1014 Field::unnamed(DataType::Int32),
1015 Field::unnamed(DataType::Int32),
1016 ]);
1017 let column_ids = vec![0.into(), 1.into()];
1018
1019 let chunk1 = StreamChunk::from_pretty(
1021 " i i
1022 + 1 3
1023 + 1 4
1024 + 2 5
1025 + 3 6",
1026 );
1027
1028 let chunk2 = StreamChunk::from_pretty(
1029 " i i
1030 + 1 3
1031 + 2 6",
1032 );
1033
1034 let chunk3 = StreamChunk::from_pretty(
1036 " i i
1037 + 1 4",
1038 );
1039
1040 let source = MockSource::with_messages(vec![
1042 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1043 Message::Chunk(chunk1),
1044 Message::Chunk(chunk2),
1045 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1046 Message::Chunk(chunk3),
1047 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1048 ])
1049 .into_executor(schema.clone(), PkIndices::new());
1050
1051 let order_types = vec![OrderType::ascending()];
1052 let column_descs = vec![
1053 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1054 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1055 ];
1056
1057 let table = BatchTable::for_test(
1058 memory_state_store.clone(),
1059 table_id,
1060 column_descs,
1061 order_types,
1062 vec![0],
1063 vec![0, 1],
1064 );
1065
1066 let mut materialize_executor = MaterializeExecutor::for_test(
1067 source,
1068 memory_state_store,
1069 table_id,
1070 vec![ColumnOrder::new(0, OrderType::ascending())],
1071 column_ids,
1072 Arc::new(AtomicU64::new(0)),
1073 ConflictBehavior::Overwrite,
1074 )
1075 .await
1076 .boxed()
1077 .execute();
1078 materialize_executor.next().await.transpose().unwrap();
1079
1080 materialize_executor.next().await.transpose().unwrap();
1081 materialize_executor.next().await.transpose().unwrap();
1082
1083 match materialize_executor.next().await.transpose().unwrap() {
1085 Some(Message::Barrier(_)) => {
1086 let row = table
1087 .get_row(
1088 &OwnedRow::new(vec![Some(3_i32.into())]),
1089 HummockReadEpoch::NoWait(u64::MAX),
1090 )
1091 .await
1092 .unwrap();
1093 assert_eq!(
1094 row,
1095 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1096 );
1097
1098 let row = table
1099 .get_row(
1100 &OwnedRow::new(vec![Some(1_i32.into())]),
1101 HummockReadEpoch::NoWait(u64::MAX),
1102 )
1103 .await
1104 .unwrap();
1105 assert_eq!(
1106 row,
1107 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1108 );
1109
1110 let row = table
1111 .get_row(
1112 &OwnedRow::new(vec![Some(2_i32.into())]),
1113 HummockReadEpoch::NoWait(u64::MAX),
1114 )
1115 .await
1116 .unwrap();
1117 assert_eq!(
1118 row,
1119 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1120 );
1121 }
1122 _ => unreachable!(),
1123 }
1124 }
1125
1126 #[tokio::test]
1127 async fn test_delete_and_update_conflict() {
1128 let memory_state_store = MemoryStateStore::new();
1130 let table_id = TableId::new(1);
1131 let schema = Schema::new(vec![
1133 Field::unnamed(DataType::Int32),
1134 Field::unnamed(DataType::Int32),
1135 ]);
1136 let column_ids = vec![0.into(), 1.into()];
1137
1138 let chunk1 = StreamChunk::from_pretty(
1140 " i i
1141 + 1 4
1142 + 2 5
1143 + 3 6
1144 U- 8 1
1145 U+ 8 2
1146 + 8 3",
1147 );
1148
1149 let chunk2 = StreamChunk::from_pretty(
1151 " i i
1152 + 7 8
1153 - 3 4
1154 - 5 0",
1155 );
1156
1157 let chunk3 = StreamChunk::from_pretty(
1159 " i i
1160 + 1 5
1161 U- 2 4
1162 U+ 2 8
1163 U- 9 0
1164 U+ 9 1",
1165 );
1166
1167 let source = MockSource::with_messages(vec![
1169 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1170 Message::Chunk(chunk1),
1171 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1172 Message::Chunk(chunk2),
1173 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1174 Message::Chunk(chunk3),
1175 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1176 ])
1177 .into_executor(schema.clone(), PkIndices::new());
1178
1179 let order_types = vec![OrderType::ascending()];
1180 let column_descs = vec![
1181 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1182 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1183 ];
1184
1185 let table = BatchTable::for_test(
1186 memory_state_store.clone(),
1187 table_id,
1188 column_descs,
1189 order_types,
1190 vec![0],
1191 vec![0, 1],
1192 );
1193
1194 let mut materialize_executor = MaterializeExecutor::for_test(
1195 source,
1196 memory_state_store,
1197 table_id,
1198 vec![ColumnOrder::new(0, OrderType::ascending())],
1199 column_ids,
1200 Arc::new(AtomicU64::new(0)),
1201 ConflictBehavior::Overwrite,
1202 )
1203 .await
1204 .boxed()
1205 .execute();
1206 materialize_executor.next().await.transpose().unwrap();
1207
1208 materialize_executor.next().await.transpose().unwrap();
1209
1210 match materialize_executor.next().await.transpose().unwrap() {
1212 Some(Message::Barrier(_)) => {
1213 let row = table
1215 .get_row(
1216 &OwnedRow::new(vec![Some(8_i32.into())]),
1217 HummockReadEpoch::NoWait(u64::MAX),
1218 )
1219 .await
1220 .unwrap();
1221 assert_eq!(
1222 row,
1223 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1224 );
1225 }
1226 _ => unreachable!(),
1227 }
1228 materialize_executor.next().await.transpose().unwrap();
1229
1230 match materialize_executor.next().await.transpose().unwrap() {
1231 Some(Message::Barrier(_)) => {
1232 let row = table
1233 .get_row(
1234 &OwnedRow::new(vec![Some(7_i32.into())]),
1235 HummockReadEpoch::NoWait(u64::MAX),
1236 )
1237 .await
1238 .unwrap();
1239 assert_eq!(
1240 row,
1241 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1242 );
1243
1244 let row = table
1246 .get_row(
1247 &OwnedRow::new(vec![Some(3_i32.into())]),
1248 HummockReadEpoch::NoWait(u64::MAX),
1249 )
1250 .await
1251 .unwrap();
1252 assert_eq!(row, None);
1253
1254 let row = table
1256 .get_row(
1257 &OwnedRow::new(vec![Some(5_i32.into())]),
1258 HummockReadEpoch::NoWait(u64::MAX),
1259 )
1260 .await
1261 .unwrap();
1262 assert_eq!(row, None);
1263 }
1264 _ => unreachable!(),
1265 }
1266
1267 materialize_executor.next().await.transpose().unwrap();
1268 match materialize_executor.next().await.transpose().unwrap() {
1270 Some(Message::Barrier(_)) => {
1271 let row = table
1272 .get_row(
1273 &OwnedRow::new(vec![Some(1_i32.into())]),
1274 HummockReadEpoch::NoWait(u64::MAX),
1275 )
1276 .await
1277 .unwrap();
1278 assert_eq!(
1279 row,
1280 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1281 );
1282
1283 let row = table
1285 .get_row(
1286 &OwnedRow::new(vec![Some(2_i32.into())]),
1287 HummockReadEpoch::NoWait(u64::MAX),
1288 )
1289 .await
1290 .unwrap();
1291 assert_eq!(
1292 row,
1293 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1294 );
1295
1296 let row = table
1298 .get_row(
1299 &OwnedRow::new(vec![Some(9_i32.into())]),
1300 HummockReadEpoch::NoWait(u64::MAX),
1301 )
1302 .await
1303 .unwrap();
1304 assert_eq!(
1305 row,
1306 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1307 );
1308 }
1309 _ => unreachable!(),
1310 }
1311 }
1312
1313 #[tokio::test]
1314 async fn test_ignore_insert_conflict() {
1315 let memory_state_store = MemoryStateStore::new();
1317 let table_id = TableId::new(1);
1318 let schema = Schema::new(vec![
1320 Field::unnamed(DataType::Int32),
1321 Field::unnamed(DataType::Int32),
1322 ]);
1323 let column_ids = vec![0.into(), 1.into()];
1324
1325 let chunk1 = StreamChunk::from_pretty(
1327 " i i
1328 + 1 3
1329 + 1 4
1330 + 2 5
1331 + 3 6",
1332 );
1333
1334 let chunk2 = StreamChunk::from_pretty(
1335 " i i
1336 + 1 5
1337 + 2 6",
1338 );
1339
1340 let chunk3 = StreamChunk::from_pretty(
1342 " i i
1343 + 1 6",
1344 );
1345
1346 let source = MockSource::with_messages(vec![
1348 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1349 Message::Chunk(chunk1),
1350 Message::Chunk(chunk2),
1351 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1352 Message::Chunk(chunk3),
1353 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1354 ])
1355 .into_executor(schema.clone(), PkIndices::new());
1356
1357 let order_types = vec![OrderType::ascending()];
1358 let column_descs = vec![
1359 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1360 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1361 ];
1362
1363 let table = BatchTable::for_test(
1364 memory_state_store.clone(),
1365 table_id,
1366 column_descs,
1367 order_types,
1368 vec![0],
1369 vec![0, 1],
1370 );
1371
1372 let mut materialize_executor = MaterializeExecutor::for_test(
1373 source,
1374 memory_state_store,
1375 table_id,
1376 vec![ColumnOrder::new(0, OrderType::ascending())],
1377 column_ids,
1378 Arc::new(AtomicU64::new(0)),
1379 ConflictBehavior::IgnoreConflict,
1380 )
1381 .await
1382 .boxed()
1383 .execute();
1384 materialize_executor.next().await.transpose().unwrap();
1385
1386 materialize_executor.next().await.transpose().unwrap();
1387 materialize_executor.next().await.transpose().unwrap();
1388
1389 match materialize_executor.next().await.transpose().unwrap() {
1391 Some(Message::Barrier(_)) => {
1392 let row = table
1393 .get_row(
1394 &OwnedRow::new(vec![Some(3_i32.into())]),
1395 HummockReadEpoch::NoWait(u64::MAX),
1396 )
1397 .await
1398 .unwrap();
1399 assert_eq!(
1400 row,
1401 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1402 );
1403
1404 let row = table
1405 .get_row(
1406 &OwnedRow::new(vec![Some(1_i32.into())]),
1407 HummockReadEpoch::NoWait(u64::MAX),
1408 )
1409 .await
1410 .unwrap();
1411 assert_eq!(
1412 row,
1413 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1414 );
1415
1416 let row = table
1417 .get_row(
1418 &OwnedRow::new(vec![Some(2_i32.into())]),
1419 HummockReadEpoch::NoWait(u64::MAX),
1420 )
1421 .await
1422 .unwrap();
1423 assert_eq!(
1424 row,
1425 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1426 );
1427 }
1428 _ => unreachable!(),
1429 }
1430 }
1431
1432 #[tokio::test]
1433 async fn test_ignore_delete_then_insert() {
1434 let memory_state_store = MemoryStateStore::new();
1436 let table_id = TableId::new(1);
1437 let schema = Schema::new(vec![
1439 Field::unnamed(DataType::Int32),
1440 Field::unnamed(DataType::Int32),
1441 ]);
1442 let column_ids = vec![0.into(), 1.into()];
1443
1444 let chunk1 = StreamChunk::from_pretty(
1446 " i i
1447 + 1 3
1448 - 1 3
1449 + 1 6",
1450 );
1451
1452 let source = MockSource::with_messages(vec![
1454 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1455 Message::Chunk(chunk1),
1456 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1457 ])
1458 .into_executor(schema.clone(), PkIndices::new());
1459
1460 let order_types = vec![OrderType::ascending()];
1461 let column_descs = vec![
1462 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1463 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1464 ];
1465
1466 let table = BatchTable::for_test(
1467 memory_state_store.clone(),
1468 table_id,
1469 column_descs,
1470 order_types,
1471 vec![0],
1472 vec![0, 1],
1473 );
1474
1475 let mut materialize_executor = MaterializeExecutor::for_test(
1476 source,
1477 memory_state_store,
1478 table_id,
1479 vec![ColumnOrder::new(0, OrderType::ascending())],
1480 column_ids,
1481 Arc::new(AtomicU64::new(0)),
1482 ConflictBehavior::IgnoreConflict,
1483 )
1484 .await
1485 .boxed()
1486 .execute();
1487 let _msg1 = materialize_executor
1488 .next()
1489 .await
1490 .transpose()
1491 .unwrap()
1492 .unwrap()
1493 .as_barrier()
1494 .unwrap();
1495 let _msg2 = materialize_executor
1496 .next()
1497 .await
1498 .transpose()
1499 .unwrap()
1500 .unwrap()
1501 .as_chunk()
1502 .unwrap();
1503 let _msg3 = materialize_executor
1504 .next()
1505 .await
1506 .transpose()
1507 .unwrap()
1508 .unwrap()
1509 .as_barrier()
1510 .unwrap();
1511
1512 let row = table
1513 .get_row(
1514 &OwnedRow::new(vec![Some(1_i32.into())]),
1515 HummockReadEpoch::NoWait(u64::MAX),
1516 )
1517 .await
1518 .unwrap();
1519 assert_eq!(
1520 row,
1521 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1522 );
1523 }
1524
1525 #[tokio::test]
1526 async fn test_ignore_delete_and_update_conflict() {
1527 let memory_state_store = MemoryStateStore::new();
1529 let table_id = TableId::new(1);
1530 let schema = Schema::new(vec![
1532 Field::unnamed(DataType::Int32),
1533 Field::unnamed(DataType::Int32),
1534 ]);
1535 let column_ids = vec![0.into(), 1.into()];
1536
1537 let chunk1 = StreamChunk::from_pretty(
1539 " i i
1540 + 1 4
1541 + 2 5
1542 + 3 6
1543 U- 8 1
1544 U+ 8 2
1545 + 8 3",
1546 );
1547
1548 let chunk2 = StreamChunk::from_pretty(
1550 " i i
1551 + 7 8
1552 - 3 4
1553 - 5 0",
1554 );
1555
1556 let chunk3 = StreamChunk::from_pretty(
1558 " i i
1559 + 1 5
1560 U- 2 4
1561 U+ 2 8
1562 U- 9 0
1563 U+ 9 1",
1564 );
1565
1566 let source = MockSource::with_messages(vec![
1568 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1569 Message::Chunk(chunk1),
1570 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1571 Message::Chunk(chunk2),
1572 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1573 Message::Chunk(chunk3),
1574 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1575 ])
1576 .into_executor(schema.clone(), PkIndices::new());
1577
1578 let order_types = vec![OrderType::ascending()];
1579 let column_descs = vec![
1580 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1581 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1582 ];
1583
1584 let table = BatchTable::for_test(
1585 memory_state_store.clone(),
1586 table_id,
1587 column_descs,
1588 order_types,
1589 vec![0],
1590 vec![0, 1],
1591 );
1592
1593 let mut materialize_executor = MaterializeExecutor::for_test(
1594 source,
1595 memory_state_store,
1596 table_id,
1597 vec![ColumnOrder::new(0, OrderType::ascending())],
1598 column_ids,
1599 Arc::new(AtomicU64::new(0)),
1600 ConflictBehavior::IgnoreConflict,
1601 )
1602 .await
1603 .boxed()
1604 .execute();
1605 materialize_executor.next().await.transpose().unwrap();
1606
1607 materialize_executor.next().await.transpose().unwrap();
1608
1609 match materialize_executor.next().await.transpose().unwrap() {
1611 Some(Message::Barrier(_)) => {
1612 let row = table
1614 .get_row(
1615 &OwnedRow::new(vec![Some(8_i32.into())]),
1616 HummockReadEpoch::NoWait(u64::MAX),
1617 )
1618 .await
1619 .unwrap();
1620 assert_eq!(
1621 row,
1622 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1623 );
1624 }
1625 _ => unreachable!(),
1626 }
1627 materialize_executor.next().await.transpose().unwrap();
1628
1629 match materialize_executor.next().await.transpose().unwrap() {
1630 Some(Message::Barrier(_)) => {
1631 let row = table
1632 .get_row(
1633 &OwnedRow::new(vec![Some(7_i32.into())]),
1634 HummockReadEpoch::NoWait(u64::MAX),
1635 )
1636 .await
1637 .unwrap();
1638 assert_eq!(
1639 row,
1640 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1641 );
1642
1643 let row = table
1645 .get_row(
1646 &OwnedRow::new(vec![Some(3_i32.into())]),
1647 HummockReadEpoch::NoWait(u64::MAX),
1648 )
1649 .await
1650 .unwrap();
1651 assert_eq!(row, None);
1652
1653 let row = table
1655 .get_row(
1656 &OwnedRow::new(vec![Some(5_i32.into())]),
1657 HummockReadEpoch::NoWait(u64::MAX),
1658 )
1659 .await
1660 .unwrap();
1661 assert_eq!(row, None);
1662 }
1663 _ => unreachable!(),
1664 }
1665
1666 materialize_executor.next().await.transpose().unwrap();
1667 match materialize_executor.next().await.transpose().unwrap() {
1670 Some(Message::Barrier(_)) => {
1671 let row = table
1672 .get_row(
1673 &OwnedRow::new(vec![Some(1_i32.into())]),
1674 HummockReadEpoch::NoWait(u64::MAX),
1675 )
1676 .await
1677 .unwrap();
1678 assert_eq!(
1679 row,
1680 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1681 );
1682
1683 let row = table
1685 .get_row(
1686 &OwnedRow::new(vec![Some(2_i32.into())]),
1687 HummockReadEpoch::NoWait(u64::MAX),
1688 )
1689 .await
1690 .unwrap();
1691 assert_eq!(
1692 row,
1693 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1694 );
1695
1696 let row = table
1698 .get_row(
1699 &OwnedRow::new(vec![Some(9_i32.into())]),
1700 HummockReadEpoch::NoWait(u64::MAX),
1701 )
1702 .await
1703 .unwrap();
1704 assert_eq!(
1705 row,
1706 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1707 );
1708 }
1709 _ => unreachable!(),
1710 }
1711 }
1712
1713 #[tokio::test]
1714 async fn test_do_update_if_not_null_conflict() {
1715 let memory_state_store = MemoryStateStore::new();
1717 let table_id = TableId::new(1);
1718 let schema = Schema::new(vec![
1720 Field::unnamed(DataType::Int32),
1721 Field::unnamed(DataType::Int32),
1722 ]);
1723 let column_ids = vec![0.into(), 1.into()];
1724
1725 let chunk1 = StreamChunk::from_pretty(
1727 " i i
1728 + 1 4
1729 + 2 .
1730 + 3 6
1731 U- 8 .
1732 U+ 8 2
1733 + 8 .",
1734 );
1735
1736 let chunk2 = StreamChunk::from_pretty(
1738 " i i
1739 + 7 8
1740 - 3 4
1741 - 5 0",
1742 );
1743
1744 let chunk3 = StreamChunk::from_pretty(
1746 " i i
1747 + 1 5
1748 + 7 .
1749 U- 2 4
1750 U+ 2 .
1751 U- 9 0
1752 U+ 9 1",
1753 );
1754
1755 let source = MockSource::with_messages(vec![
1757 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1758 Message::Chunk(chunk1),
1759 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1760 Message::Chunk(chunk2),
1761 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1762 Message::Chunk(chunk3),
1763 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1764 ])
1765 .into_executor(schema.clone(), PkIndices::new());
1766
1767 let order_types = vec![OrderType::ascending()];
1768 let column_descs = vec![
1769 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1770 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1771 ];
1772
1773 let table = BatchTable::for_test(
1774 memory_state_store.clone(),
1775 table_id,
1776 column_descs,
1777 order_types,
1778 vec![0],
1779 vec![0, 1],
1780 );
1781
1782 let mut materialize_executor = MaterializeExecutor::for_test(
1783 source,
1784 memory_state_store,
1785 table_id,
1786 vec![ColumnOrder::new(0, OrderType::ascending())],
1787 column_ids,
1788 Arc::new(AtomicU64::new(0)),
1789 ConflictBehavior::DoUpdateIfNotNull,
1790 )
1791 .await
1792 .boxed()
1793 .execute();
1794 materialize_executor.next().await.transpose().unwrap();
1795
1796 materialize_executor.next().await.transpose().unwrap();
1797
1798 match materialize_executor.next().await.transpose().unwrap() {
1800 Some(Message::Barrier(_)) => {
1801 let row = table
1802 .get_row(
1803 &OwnedRow::new(vec![Some(8_i32.into())]),
1804 HummockReadEpoch::NoWait(u64::MAX),
1805 )
1806 .await
1807 .unwrap();
1808 assert_eq!(
1809 row,
1810 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1811 );
1812
1813 let row = table
1814 .get_row(
1815 &OwnedRow::new(vec![Some(2_i32.into())]),
1816 HummockReadEpoch::NoWait(u64::MAX),
1817 )
1818 .await
1819 .unwrap();
1820 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1821 }
1822 _ => unreachable!(),
1823 }
1824 materialize_executor.next().await.transpose().unwrap();
1825
1826 match materialize_executor.next().await.transpose().unwrap() {
1827 Some(Message::Barrier(_)) => {
1828 let row = table
1829 .get_row(
1830 &OwnedRow::new(vec![Some(7_i32.into())]),
1831 HummockReadEpoch::NoWait(u64::MAX),
1832 )
1833 .await
1834 .unwrap();
1835 assert_eq!(
1836 row,
1837 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1838 );
1839
1840 let row = table
1842 .get_row(
1843 &OwnedRow::new(vec![Some(3_i32.into())]),
1844 HummockReadEpoch::NoWait(u64::MAX),
1845 )
1846 .await
1847 .unwrap();
1848 assert_eq!(row, None);
1849
1850 let row = table
1852 .get_row(
1853 &OwnedRow::new(vec![Some(5_i32.into())]),
1854 HummockReadEpoch::NoWait(u64::MAX),
1855 )
1856 .await
1857 .unwrap();
1858 assert_eq!(row, None);
1859 }
1860 _ => unreachable!(),
1861 }
1862
1863 materialize_executor.next().await.transpose().unwrap();
1864 match materialize_executor.next().await.transpose().unwrap() {
1867 Some(Message::Barrier(_)) => {
1868 let row = table
1869 .get_row(
1870 &OwnedRow::new(vec![Some(7_i32.into())]),
1871 HummockReadEpoch::NoWait(u64::MAX),
1872 )
1873 .await
1874 .unwrap();
1875 assert_eq!(
1876 row,
1877 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1878 );
1879
1880 let row = table
1882 .get_row(
1883 &OwnedRow::new(vec![Some(2_i32.into())]),
1884 HummockReadEpoch::NoWait(u64::MAX),
1885 )
1886 .await
1887 .unwrap();
1888 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1889
1890 let row = table
1892 .get_row(
1893 &OwnedRow::new(vec![Some(9_i32.into())]),
1894 HummockReadEpoch::NoWait(u64::MAX),
1895 )
1896 .await
1897 .unwrap();
1898 assert_eq!(
1899 row,
1900 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1901 );
1902 }
1903 _ => unreachable!(),
1904 }
1905 }
1906
1907 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1908 const KN: u32 = 4;
1909 const SEED: u64 = 998244353;
1910 let mut ret = vec![];
1911 let mut builder =
1912 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1913 let mut rng = SmallRng::seed_from_u64(SEED);
1914
1915 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1916 let len = c.data_chunk().capacity();
1917 let mut c = StreamChunkMut::from(c);
1918 for i in 0..len {
1919 c.set_vis(i, rng.random_bool(0.5));
1920 }
1921 c.into()
1922 };
1923 for _ in 0..row_number {
1924 let k = (rng.next_u32() % KN) as i32;
1925 let v = rng.next_u32() as i32;
1926 let op = if rng.random_bool(0.5) {
1927 Op::Insert
1928 } else {
1929 Op::Delete
1930 };
1931 if let Some(c) =
1932 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1933 {
1934 ret.push(random_vis(c, &mut rng));
1935 }
1936 }
1937 if let Some(c) = builder.take() {
1938 ret.push(random_vis(c, &mut rng));
1939 }
1940 ret
1941 }
1942
1943 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1944 const N: usize = 100000;
1945
1946 let memory_state_store = MemoryStateStore::new();
1948 let table_id = TableId::new(1);
1949 let schema = Schema::new(vec![
1951 Field::unnamed(DataType::Int32),
1952 Field::unnamed(DataType::Int32),
1953 ]);
1954 let column_ids = vec![0.into(), 1.into()];
1955
1956 let chunks = gen_fuzz_data(N, 128);
1957 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1958 .chain(chunks.into_iter().map(Message::Chunk))
1959 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1960 test_epoch(2),
1961 ))))
1962 .collect();
1963 let source =
1965 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1966
1967 let mut materialize_executor = MaterializeExecutor::for_test(
1968 source,
1969 memory_state_store.clone(),
1970 table_id,
1971 vec![ColumnOrder::new(0, OrderType::ascending())],
1972 column_ids,
1973 Arc::new(AtomicU64::new(0)),
1974 conflict_behavior,
1975 )
1976 .await
1977 .boxed()
1978 .execute();
1979 materialize_executor.expect_barrier().await;
1980
1981 let order_types = vec![OrderType::ascending()];
1982 let column_descs = vec![
1983 ColumnDesc::unnamed(0.into(), DataType::Int32),
1984 ColumnDesc::unnamed(1.into(), DataType::Int32),
1985 ];
1986 let pk_indices = vec![0];
1987
1988 let mut table = StateTable::from_table_catalog(
1989 &gen_pbtable(
1990 TableId::from(1002),
1991 column_descs.clone(),
1992 order_types,
1993 pk_indices,
1994 0,
1995 ),
1996 memory_state_store.clone(),
1997 None,
1998 )
1999 .await;
2000
2001 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2002 table.write_chunk(c);
2004 }
2005 }
2006
2007 #[tokio::test]
2008 async fn fuzz_test_stream_consistent_upsert() {
2009 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2010 }
2011
2012 #[tokio::test]
2013 async fn fuzz_test_stream_consistent_ignore() {
2014 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2015 }
2016}