1use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27 ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, OwnedRow};
30use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
34use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
35use risingwave_pb::catalog::Table;
36use risingwave_pb::catalog::table::Engine;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{
42 StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
43};
44use crate::executor::monitor::MaterializeMetrics;
45use crate::executor::prelude::*;
46
47pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
49 input: Executor,
50
51 schema: Schema,
52
53 state_table: StateTableInner<S, SD>,
54
55 arrange_key_indices: Vec<usize>,
57
58 actor_context: ActorContextRef,
59
60 materialize_cache: MaterializeCache<SD>,
61
62 conflict_behavior: ConflictBehavior,
63
64 version_column_indices: Vec<u32>,
65
66 may_have_downstream: bool,
67
68 depended_subscription_ids: HashSet<u32>,
69
70 metrics: MaterializeMetrics,
71
72 is_dummy_table: bool,
75
76 toastable_column_indices: Option<Vec<usize>>,
78}
79
80fn get_op_consistency_level(
81 conflict_behavior: ConflictBehavior,
82 may_have_downstream: bool,
83 depended_subscriptions: &HashSet<u32>,
84) -> StateTableOpConsistencyLevel {
85 if !depended_subscriptions.is_empty() {
86 StateTableOpConsistencyLevel::LogStoreEnabled
87 } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
88 StateTableOpConsistencyLevel::Inconsistent
91 } else {
92 StateTableOpConsistencyLevel::ConsistentOldValue
93 }
94}
95
96impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
97 #[allow(clippy::too_many_arguments)]
101 pub async fn new(
102 input: Executor,
103 schema: Schema,
104 store: S,
105 arrange_key: Vec<ColumnOrder>,
106 actor_context: ActorContextRef,
107 vnodes: Option<Arc<Bitmap>>,
108 table_catalog: &Table,
109 watermark_epoch: AtomicU64Ref,
110 conflict_behavior: ConflictBehavior,
111 version_column_indices: Vec<u32>,
112 metrics: Arc<StreamingMetrics>,
113 ) -> Self {
114 let table_columns: Vec<ColumnDesc> = table_catalog
115 .columns
116 .iter()
117 .map(|col| col.column_desc.as_ref().unwrap().into())
118 .collect();
119
120 let toastable_column_indices = if table_catalog.cdc_table_type()
123 == risingwave_pb::catalog::table::CdcTableType::Postgres
124 {
125 let toastable_indices: Vec<usize> = table_columns
126 .iter()
127 .enumerate()
128 .filter_map(|(index, column)| match &column.data_type {
129 DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
138 Some(index)
139 }
140 _ => None,
141 })
142 .collect();
143
144 if toastable_indices.is_empty() {
145 None
146 } else {
147 Some(toastable_indices)
148 }
149 } else {
150 None
151 };
152
153 let row_serde: BasicSerde = BasicSerde::new(
154 Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
155 Arc::from(table_columns.into_boxed_slice()),
156 );
157
158 let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
159 let may_have_downstream = actor_context.initial_dispatch_num != 0;
160 let depended_subscription_ids = actor_context
161 .related_subscriptions
162 .get(&TableId::new(table_catalog.id))
163 .cloned()
164 .unwrap_or_default();
165 let op_consistency_level = get_op_consistency_level(
166 conflict_behavior,
167 may_have_downstream,
168 &depended_subscription_ids,
169 );
170 let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
172 .with_op_consistency_level(op_consistency_level)
173 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
174 .build()
175 .await;
176
177 let mv_metrics = metrics.new_materialize_metrics(
178 TableId::new(table_catalog.id),
179 actor_context.id,
180 actor_context.fragment_id,
181 );
182
183 let metrics_info =
184 MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
185
186 let is_dummy_table =
187 table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
188
189 Self {
190 input,
191 schema,
192 state_table,
193 arrange_key_indices,
194 actor_context,
195 materialize_cache: MaterializeCache::new(
196 watermark_epoch,
197 metrics_info,
198 row_serde,
199 version_column_indices.clone(),
200 ),
201 conflict_behavior,
202 version_column_indices,
203 is_dummy_table,
204 may_have_downstream,
205 depended_subscription_ids,
206 metrics: mv_metrics,
207 toastable_column_indices,
208 }
209 }
210
211 #[try_stream(ok = Message, error = StreamExecutorError)]
212 async fn execute_inner(mut self) {
213 let mv_table_id = TableId::new(self.state_table.table_id());
214 let data_types = self.schema.data_types();
215 let mut input = self.input.execute();
216
217 let barrier = expect_first_barrier(&mut input).await?;
218 let first_epoch = barrier.epoch;
219 yield Message::Barrier(barrier);
221 self.state_table.init_epoch(first_epoch).await?;
222
223 #[for_await]
224 for msg in input {
225 let msg = msg?;
226 self.materialize_cache.evict();
227
228 let msg = match msg {
229 Message::Watermark(w) => Message::Watermark(w),
230 Message::Chunk(chunk) if self.is_dummy_table => {
231 self.metrics
232 .materialize_input_row_count
233 .inc_by(chunk.cardinality() as u64);
234 Message::Chunk(chunk)
235 }
236 Message::Chunk(chunk) => {
237 self.metrics
238 .materialize_input_row_count
239 .inc_by(chunk.cardinality() as u64);
240
241 let do_not_handle_conflict = !self.state_table.is_consistent_op()
245 && self.version_column_indices.is_empty()
246 && self.conflict_behavior == ConflictBehavior::Overwrite;
247
248 match self.conflict_behavior {
249 checked_conflict_behaviors!() if !do_not_handle_conflict => {
250 if chunk.cardinality() == 0 {
251 continue;
253 }
254 let (data_chunk, ops) = chunk.into_parts();
255
256 if self.state_table.value_indices().is_some() {
257 panic!(
260 "materialize executor with data check can not handle only materialize partial columns"
261 )
262 };
263 let values = data_chunk.serialize();
264
265 let key_chunk = data_chunk.project(self.state_table.pk_indices());
266
267 let pks = {
268 let mut pks = vec![vec![]; data_chunk.capacity()];
269 key_chunk
270 .rows_with_holes()
271 .zip_eq_fast(pks.iter_mut())
272 .for_each(|(r, vnode_and_pk)| {
273 if let Some(r) = r {
274 self.state_table.pk_serde().serialize(r, vnode_and_pk);
275 }
276 });
277 pks
278 };
279 let (_, vis) = key_chunk.into_parts();
280 let row_ops = ops
281 .iter()
282 .zip_eq_debug(pks.into_iter())
283 .zip_eq_debug(values.into_iter())
284 .zip_eq_debug(vis.iter())
285 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
286 .collect_vec();
287
288 let change_buffer = self
289 .materialize_cache
290 .handle(
291 row_ops,
292 &self.state_table,
293 self.conflict_behavior,
294 &self.metrics,
295 self.toastable_column_indices.as_deref(),
296 )
297 .await?;
298
299 match generate_output(change_buffer, data_types.clone())? {
300 Some(output_chunk) => {
301 self.state_table.write_chunk(output_chunk.clone());
302 self.state_table.try_flush().await?;
303 Message::Chunk(output_chunk)
304 }
305 None => continue,
306 }
307 }
308 ConflictBehavior::IgnoreConflict => unreachable!(),
309 ConflictBehavior::NoCheck
310 | ConflictBehavior::Overwrite
311 | ConflictBehavior::DoUpdateIfNotNull => {
312 self.state_table.write_chunk(chunk.clone());
313 self.state_table.try_flush().await?;
314 Message::Chunk(chunk)
315 } }
317 }
318 Message::Barrier(b) => {
319 if !self.may_have_downstream
321 && b.has_more_downstream_fragments(self.actor_context.id)
322 {
323 self.may_have_downstream = true;
324 }
325 Self::may_update_depended_subscriptions(
326 &mut self.depended_subscription_ids,
327 &b,
328 mv_table_id,
329 );
330 let op_consistency_level = get_op_consistency_level(
331 self.conflict_behavior,
332 self.may_have_downstream,
333 &self.depended_subscription_ids,
334 );
335 let post_commit = self
336 .state_table
337 .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
338 .await?;
339 if !post_commit.inner().is_consistent_op() {
340 assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
341 }
342
343 let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
344 let b_epoch = b.epoch;
345 yield Message::Barrier(b);
346
347 if let Some((_, cache_may_stale)) =
349 post_commit.post_yield_barrier(update_vnode_bitmap).await?
350 && cache_may_stale
351 {
352 self.materialize_cache.lru_cache.clear();
353 }
354
355 self.metrics
356 .materialize_current_epoch
357 .set(b_epoch.curr as i64);
358
359 continue;
360 }
361 };
362 yield msg;
363 }
364 }
365
366 fn may_update_depended_subscriptions(
368 depended_subscriptions: &mut HashSet<u32>,
369 barrier: &Barrier,
370 mv_table_id: TableId,
371 ) {
372 for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
373 if !depended_subscriptions.insert(subscriber_id) {
374 warn!(
375 ?depended_subscriptions,
376 ?mv_table_id,
377 subscriber_id,
378 "subscription id already exists"
379 );
380 }
381 }
382
383 if let Some(Mutation::DropSubscriptions {
384 subscriptions_to_drop,
385 }) = barrier.mutation.as_deref()
386 {
387 for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
388 if *upstream_mv_table_id == mv_table_id
389 && !depended_subscriptions.remove(subscriber_id)
390 {
391 warn!(
392 ?depended_subscriptions,
393 ?mv_table_id,
394 subscriber_id,
395 "drop non existing subscriber_id id"
396 );
397 }
398 }
399 }
400 }
401}
402
403impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
404 #[cfg(any(test, feature = "test"))]
406 pub async fn for_test(
407 input: Executor,
408 store: S,
409 table_id: TableId,
410 keys: Vec<ColumnOrder>,
411 column_ids: Vec<risingwave_common::catalog::ColumnId>,
412 watermark_epoch: AtomicU64Ref,
413 conflict_behavior: ConflictBehavior,
414 ) -> Self {
415 let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
416 let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
417 let schema = input.schema().clone();
418 let columns: Vec<ColumnDesc> = column_ids
419 .into_iter()
420 .zip_eq_fast(schema.fields.iter())
421 .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
422 .collect_vec();
423
424 let row_serde = BasicSerde::new(
425 Arc::from((0..columns.len()).collect_vec()),
426 Arc::from(columns.clone().into_boxed_slice()),
427 );
428 let state_table = StateTableInner::from_table_catalog(
429 &crate::common::table::test_utils::gen_pbtable(
430 table_id,
431 columns,
432 arrange_order_types,
433 arrange_columns.clone(),
434 0,
435 ),
436 store,
437 None,
438 )
439 .await;
440
441 let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
442
443 Self {
444 input,
445 schema,
446 state_table,
447 arrange_key_indices: arrange_columns.clone(),
448 actor_context: ActorContext::for_test(0),
449 materialize_cache: MaterializeCache::new(
450 watermark_epoch,
451 MetricsInfo::for_test(),
452 row_serde,
453 vec![],
454 ),
455 conflict_behavior,
456 version_column_indices: vec![],
457 is_dummy_table: false,
458 toastable_column_indices: None,
459 may_have_downstream: true,
460 depended_subscription_ids: HashSet::new(),
461 metrics,
462 }
463 }
464}
465
466fn is_unavailable_value_str(s: &str) -> bool {
469 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
470}
471
472fn is_debezium_unavailable_value(
475 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
476) -> bool {
477 match datum {
478 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
479 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
480 jsonb_ref
482 .as_str()
483 .map(is_unavailable_value_str)
484 .unwrap_or(false)
485 }
486 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
487 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
491 is_unavailable_value_str(bytea_str)
492 } else {
493 false
494 }
495 }
496 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
497 if list_ref.len() == 1 {
501 if let Some(Some(element)) = list_ref.get(0) {
502 is_debezium_unavailable_value(&Some(element))
504 } else {
505 false
506 }
507 } else {
508 false
509 }
510 }
511 _ => false,
512 }
513}
514
515fn handle_toast_columns_for_postgres_cdc(
517 old_row: &OwnedRow,
518 new_row: &OwnedRow,
519 toastable_indices: &[usize],
520) -> OwnedRow {
521 let mut fixed_row_data = new_row.as_inner().to_vec();
522
523 for &toast_idx in toastable_indices {
524 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
526 if is_unavailable {
527 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
529 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
530 }
531 }
532 }
533
534 OwnedRow::new(fixed_row_data)
535}
536
537fn generate_output(
539 change_buffer: ChangeBuffer,
540 data_types: Vec<DataType>,
541) -> StreamExecutorResult<Option<StreamChunk>> {
542 let mut new_ops: Vec<Op> = vec![];
545 let mut new_rows: Vec<OwnedRow> = vec![];
546 for (_, row_op) in change_buffer.into_parts() {
547 match row_op {
548 ChangeBufferKeyOp::Insert(value) => {
549 new_ops.push(Op::Insert);
550 new_rows.push(value);
551 }
552 ChangeBufferKeyOp::Delete(old_value) => {
553 new_ops.push(Op::Delete);
554 new_rows.push(old_value);
555 }
556 ChangeBufferKeyOp::Update((old_value, new_value)) => {
557 if old_value != new_value {
559 new_ops.push(Op::UpdateDelete);
560 new_ops.push(Op::UpdateInsert);
561 new_rows.push(old_value);
562 new_rows.push(new_value);
563 }
564 }
565 }
566 }
567 let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
568
569 for row in new_rows {
570 let res = data_chunk_builder.append_one_row(row);
571 debug_assert!(res.is_none());
572 }
573
574 if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
575 let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
576 Ok(Some(new_stream_chunk))
577 } else {
578 Ok(None)
579 }
580}
581
582struct ChangeBuffer {
585 buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
586}
587
588enum ChangeBufferKeyOp {
590 Insert(OwnedRow),
591 Delete(OwnedRow),
592 Update((OwnedRow, OwnedRow)),
594}
595
596impl ChangeBuffer {
597 fn new() -> Self {
598 Self {
599 buffer: HashMap::new(),
600 }
601 }
602
603 fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
604 let entry = self.buffer.entry(pk);
605 match entry {
606 Entry::Vacant(e) => {
607 e.insert(ChangeBufferKeyOp::Insert(value));
608 }
609 Entry::Occupied(mut e) => {
610 if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
611 let old_val = std::mem::take(old_value);
612 e.insert(ChangeBufferKeyOp::Update((old_val, value)));
613 } else {
614 unreachable!();
615 }
616 }
617 }
618 }
619
620 fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
621 let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
622 match entry {
623 Entry::Vacant(e) => {
624 e.insert(ChangeBufferKeyOp::Delete(old_value));
625 }
626 Entry::Occupied(mut e) => match e.get_mut() {
627 ChangeBufferKeyOp::Insert(_) => {
628 e.remove();
629 }
630 ChangeBufferKeyOp::Update((prev, _curr)) => {
631 let prev = std::mem::take(prev);
632 e.insert(ChangeBufferKeyOp::Delete(prev));
633 }
634 ChangeBufferKeyOp::Delete(_) => {
635 unreachable!();
636 }
637 },
638 }
639 }
640
641 fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
642 let entry = self.buffer.entry(pk);
643 match entry {
644 Entry::Vacant(e) => {
645 e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
646 }
647 Entry::Occupied(mut e) => match e.get_mut() {
648 ChangeBufferKeyOp::Insert(_) => {
649 e.insert(ChangeBufferKeyOp::Insert(new_value));
650 }
651 ChangeBufferKeyOp::Update((_prev, curr)) => {
652 *curr = new_value;
653 }
654 ChangeBufferKeyOp::Delete(_) => {
655 unreachable!()
656 }
657 },
658 }
659 }
660
661 fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
662 self.buffer
663 }
664}
665impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
666 fn execute(self: Box<Self>) -> BoxedMessageStream {
667 self.execute_inner().boxed()
668 }
669}
670
671impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
672 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673 f.debug_struct("MaterializeExecutor")
674 .field("arrange_key_indices", &self.arrange_key_indices)
675 .finish()
676 }
677}
678
679struct MaterializeCache<SD> {
681 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
682 row_serde: BasicSerde,
683 version_column_indices: Vec<u32>,
684 _serde: PhantomData<SD>,
685}
686
687type CacheValue = Option<CompactedRow>;
688
689impl<SD: ValueRowSerde> MaterializeCache<SD> {
690 fn new(
691 watermark_sequence: AtomicU64Ref,
692 metrics_info: MetricsInfo,
693 row_serde: BasicSerde,
694 version_column_indices: Vec<u32>,
695 ) -> Self {
696 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
697 ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
698 Self {
699 lru_cache,
700 row_serde,
701 version_column_indices,
702 _serde: PhantomData,
703 }
704 }
705
706 async fn handle<S: StateStore>(
707 &mut self,
708 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
709 table: &StateTableInner<S, SD>,
710 conflict_behavior: ConflictBehavior,
711 metrics: &MaterializeMetrics,
712 toastable_column_indices: Option<&[usize]>,
713 ) -> StreamExecutorResult<ChangeBuffer> {
714 assert_matches!(conflict_behavior, checked_conflict_behaviors!());
715
716 let key_set: HashSet<Box<[u8]>> = row_ops
717 .iter()
718 .map(|(_, k, _)| k.as_slice().into())
719 .collect();
720
721 self.fetch_keys(
724 key_set.iter().map(|v| v.deref()),
725 table,
726 conflict_behavior,
727 metrics,
728 )
729 .await?;
730
731 let mut change_buffer = ChangeBuffer::new();
732 let row_serde = self.row_serde.clone();
733 let version_column_indices = self.version_column_indices.clone();
734 for (op, key, row) in row_ops {
735 match op {
736 Op::Insert | Op::UpdateInsert => {
737 let Some(old_row) = self.get_expected(&key) else {
738 let new_row_deserialized =
740 row_serde.deserializer.deserialize(row.clone())?;
741 change_buffer.insert(key.clone(), new_row_deserialized);
742 self.lru_cache.put(key, Some(CompactedRow { row }));
743 continue;
744 };
745
746 match conflict_behavior {
748 ConflictBehavior::Overwrite => {
749 let old_row_deserialized =
750 row_serde.deserializer.deserialize(old_row.row.clone())?;
751 let new_row_deserialized =
752 row_serde.deserializer.deserialize(row.clone())?;
753
754 let need_overwrite = if !version_column_indices.is_empty() {
755 versions_are_newer_or_equal(
756 &old_row_deserialized,
757 &new_row_deserialized,
758 &version_column_indices,
759 )
760 } else {
761 true
763 };
764
765 if need_overwrite {
766 if let Some(toastable_indices) = toastable_column_indices {
767 let final_row = handle_toast_columns_for_postgres_cdc(
769 &old_row_deserialized,
770 &new_row_deserialized,
771 toastable_indices,
772 );
773
774 change_buffer.update(
775 key.clone(),
776 old_row_deserialized,
777 final_row.clone(),
778 );
779 let final_row_bytes =
780 Bytes::from(row_serde.serializer.serialize(final_row));
781 self.lru_cache.put(
782 key.clone(),
783 Some(CompactedRow {
784 row: final_row_bytes,
785 }),
786 );
787 } else {
788 change_buffer.update(
790 key.clone(),
791 old_row_deserialized,
792 new_row_deserialized,
793 );
794 self.lru_cache
795 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
796 }
797 };
798 }
799 ConflictBehavior::IgnoreConflict => {
800 }
802 ConflictBehavior::DoUpdateIfNotNull => {
803 let old_row_deserialized =
806 row_serde.deserializer.deserialize(old_row.row.clone())?;
807 let new_row_deserialized =
808 row_serde.deserializer.deserialize(row.clone())?;
809 let need_overwrite = if !version_column_indices.is_empty() {
810 versions_are_newer_or_equal(
811 &old_row_deserialized,
812 &new_row_deserialized,
813 &version_column_indices,
814 )
815 } else {
816 true
817 };
818
819 if need_overwrite {
820 let mut row_deserialized_vec =
821 old_row_deserialized.clone().into_inner().into_vec();
822 replace_if_not_null(
823 &mut row_deserialized_vec,
824 new_row_deserialized.clone(),
825 );
826 let mut updated_row = OwnedRow::new(row_deserialized_vec);
827
828 if let Some(toastable_indices) = toastable_column_indices {
830 let old_row_deserialized_again =
833 row_serde.deserializer.deserialize(old_row.row.clone())?;
834 updated_row = handle_toast_columns_for_postgres_cdc(
835 &old_row_deserialized_again,
836 &updated_row,
837 toastable_indices,
838 );
839 }
840
841 change_buffer.update(
842 key.clone(),
843 old_row_deserialized,
844 updated_row.clone(),
845 );
846 let updated_row_bytes =
847 Bytes::from(row_serde.serializer.serialize(updated_row));
848 self.lru_cache.put(
849 key.clone(),
850 Some(CompactedRow {
851 row: updated_row_bytes,
852 }),
853 );
854 }
855 }
856 _ => unreachable!(),
857 };
858 }
859
860 Op::Delete | Op::UpdateDelete => {
861 match conflict_behavior {
862 checked_conflict_behaviors!() => {
863 if let Some(old_row) = self.get_expected(&key) {
864 let old_row_deserialized =
865 row_serde.deserializer.deserialize(old_row.row.clone())?;
866 change_buffer.delete(key.clone(), old_row_deserialized);
867 self.lru_cache.put(key, None);
869 } else {
870 };
873 }
874 _ => unreachable!(),
875 };
876 }
877 }
878 }
879 Ok(change_buffer)
880 }
881
882 async fn fetch_keys<'a, S: StateStore>(
883 &mut self,
884 keys: impl Iterator<Item = &'a [u8]>,
885 table: &StateTableInner<S, SD>,
886 conflict_behavior: ConflictBehavior,
887 metrics: &MaterializeMetrics,
888 ) -> StreamExecutorResult<()> {
889 let mut futures = vec![];
890 for key in keys {
891 metrics.materialize_cache_total_count.inc();
892
893 if self.lru_cache.contains(key) {
894 if self.lru_cache.get(key).unwrap().is_some() {
895 metrics.materialize_data_exist_count.inc();
896 }
897 metrics.materialize_cache_hit_count.inc();
898 continue;
899 }
900 futures.push(async {
901 let key_row = table.pk_serde().deserialize(key).unwrap();
902 let row = table.get_row(key_row).await?.map(CompactedRow::from);
903 StreamExecutorResult::Ok((key.to_vec(), row))
904 });
905 }
906
907 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
908 while let Some(result) = buffered.next().await {
909 let (key, row) = result?;
910 if row.is_some() {
911 metrics.materialize_data_exist_count.inc();
912 }
913 match conflict_behavior {
915 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
916 _ => unreachable!(),
917 };
918 }
919
920 Ok(())
921 }
922
923 fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
924 self.lru_cache.get(key).unwrap_or_else(|| {
925 panic!(
926 "the key {:?} has not been fetched in the materialize executor's cache ",
927 key
928 )
929 })
930 }
931
932 fn evict(&mut self) {
933 self.lru_cache.evict()
934 }
935}
936
937fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
950 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
951 if let Some(new_value) = new_col {
952 *old_col = Some(new_value);
953 }
954 }
955}
956
957fn versions_are_newer_or_equal(
960 old_row: &OwnedRow,
961 new_row: &OwnedRow,
962 version_column_indices: &[u32],
963) -> bool {
964 if version_column_indices.is_empty() {
965 return true;
967 }
968
969 for &idx in version_column_indices {
970 let old_value = old_row.index(idx as usize);
971 let new_value = new_row.index(idx as usize);
972
973 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
974 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
978 }
979
980 true
982}
983
984#[cfg(test)]
985mod tests {
986
987 use std::iter;
988 use std::sync::atomic::AtomicU64;
989
990 use rand::rngs::SmallRng;
991 use rand::{Rng, RngCore, SeedableRng};
992 use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
993 use risingwave_common::catalog::Field;
994 use risingwave_common::util::epoch::test_epoch;
995 use risingwave_common::util::sort_util::OrderType;
996 use risingwave_hummock_sdk::HummockReadEpoch;
997 use risingwave_storage::memory::MemoryStateStore;
998 use risingwave_storage::table::batch_table::BatchTable;
999
1000 use super::*;
1001 use crate::executor::test_utils::*;
1002
1003 #[tokio::test]
1004 async fn test_materialize_executor() {
1005 let memory_state_store = MemoryStateStore::new();
1007 let table_id = TableId::new(1);
1008 let schema = Schema::new(vec![
1010 Field::unnamed(DataType::Int32),
1011 Field::unnamed(DataType::Int32),
1012 ]);
1013 let column_ids = vec![0.into(), 1.into()];
1014
1015 let chunk1 = StreamChunk::from_pretty(
1017 " i i
1018 + 1 4
1019 + 2 5
1020 + 3 6",
1021 );
1022 let chunk2 = StreamChunk::from_pretty(
1023 " i i
1024 + 7 8
1025 - 3 6",
1026 );
1027
1028 let source = MockSource::with_messages(vec![
1030 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1031 Message::Chunk(chunk1),
1032 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1033 Message::Chunk(chunk2),
1034 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1035 ])
1036 .into_executor(schema.clone(), PkIndices::new());
1037
1038 let order_types = vec![OrderType::ascending()];
1039 let column_descs = vec![
1040 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1041 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1042 ];
1043
1044 let table = BatchTable::for_test(
1045 memory_state_store.clone(),
1046 table_id,
1047 column_descs,
1048 order_types,
1049 vec![0],
1050 vec![0, 1],
1051 );
1052
1053 let mut materialize_executor = MaterializeExecutor::for_test(
1054 source,
1055 memory_state_store,
1056 table_id,
1057 vec![ColumnOrder::new(0, OrderType::ascending())],
1058 column_ids,
1059 Arc::new(AtomicU64::new(0)),
1060 ConflictBehavior::NoCheck,
1061 )
1062 .await
1063 .boxed()
1064 .execute();
1065 materialize_executor.next().await.transpose().unwrap();
1066
1067 materialize_executor.next().await.transpose().unwrap();
1068
1069 match materialize_executor.next().await.transpose().unwrap() {
1071 Some(Message::Barrier(_)) => {
1072 let row = table
1073 .get_row(
1074 &OwnedRow::new(vec![Some(3_i32.into())]),
1075 HummockReadEpoch::NoWait(u64::MAX),
1076 )
1077 .await
1078 .unwrap();
1079 assert_eq!(
1080 row,
1081 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1082 );
1083 }
1084 _ => unreachable!(),
1085 }
1086 materialize_executor.next().await.transpose().unwrap();
1087 match materialize_executor.next().await.transpose().unwrap() {
1089 Some(Message::Barrier(_)) => {
1090 let row = table
1091 .get_row(
1092 &OwnedRow::new(vec![Some(7_i32.into())]),
1093 HummockReadEpoch::NoWait(u64::MAX),
1094 )
1095 .await
1096 .unwrap();
1097 assert_eq!(
1098 row,
1099 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1100 );
1101 }
1102 _ => unreachable!(),
1103 }
1104 }
1105
1106 #[tokio::test]
1108 async fn test_upsert_stream() {
1109 let memory_state_store = MemoryStateStore::new();
1111 let table_id = TableId::new(1);
1112 let schema = Schema::new(vec![
1114 Field::unnamed(DataType::Int32),
1115 Field::unnamed(DataType::Int32),
1116 ]);
1117 let column_ids = vec![0.into(), 1.into()];
1118
1119 let chunk1 = StreamChunk::from_pretty(
1121 " i i
1122 + 1 1",
1123 );
1124
1125 let chunk2 = StreamChunk::from_pretty(
1126 " i i
1127 + 1 2
1128 - 1 2",
1129 );
1130
1131 let source = MockSource::with_messages(vec![
1133 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1134 Message::Chunk(chunk1),
1135 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1136 Message::Chunk(chunk2),
1137 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1138 ])
1139 .into_executor(schema.clone(), PkIndices::new());
1140
1141 let order_types = vec![OrderType::ascending()];
1142 let column_descs = vec![
1143 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1144 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1145 ];
1146
1147 let table = BatchTable::for_test(
1148 memory_state_store.clone(),
1149 table_id,
1150 column_descs,
1151 order_types,
1152 vec![0],
1153 vec![0, 1],
1154 );
1155
1156 let mut materialize_executor = MaterializeExecutor::for_test(
1157 source,
1158 memory_state_store,
1159 table_id,
1160 vec![ColumnOrder::new(0, OrderType::ascending())],
1161 column_ids,
1162 Arc::new(AtomicU64::new(0)),
1163 ConflictBehavior::Overwrite,
1164 )
1165 .await
1166 .boxed()
1167 .execute();
1168 materialize_executor.next().await.transpose().unwrap();
1169
1170 materialize_executor.next().await.transpose().unwrap();
1171 materialize_executor.next().await.transpose().unwrap();
1172 materialize_executor.next().await.transpose().unwrap();
1173
1174 match materialize_executor.next().await.transpose().unwrap() {
1175 Some(Message::Barrier(_)) => {
1176 let row = table
1177 .get_row(
1178 &OwnedRow::new(vec![Some(1_i32.into())]),
1179 HummockReadEpoch::NoWait(u64::MAX),
1180 )
1181 .await
1182 .unwrap();
1183 assert!(row.is_none());
1184 }
1185 _ => unreachable!(),
1186 }
1187 }
1188
1189 #[tokio::test]
1190 async fn test_check_insert_conflict() {
1191 let memory_state_store = MemoryStateStore::new();
1193 let table_id = TableId::new(1);
1194 let schema = Schema::new(vec![
1196 Field::unnamed(DataType::Int32),
1197 Field::unnamed(DataType::Int32),
1198 ]);
1199 let column_ids = vec![0.into(), 1.into()];
1200
1201 let chunk1 = StreamChunk::from_pretty(
1203 " i i
1204 + 1 3
1205 + 1 4
1206 + 2 5
1207 + 3 6",
1208 );
1209
1210 let chunk2 = StreamChunk::from_pretty(
1211 " i i
1212 + 1 3
1213 + 2 6",
1214 );
1215
1216 let chunk3 = StreamChunk::from_pretty(
1218 " i i
1219 + 1 4",
1220 );
1221
1222 let source = MockSource::with_messages(vec![
1224 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1225 Message::Chunk(chunk1),
1226 Message::Chunk(chunk2),
1227 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1228 Message::Chunk(chunk3),
1229 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1230 ])
1231 .into_executor(schema.clone(), PkIndices::new());
1232
1233 let order_types = vec![OrderType::ascending()];
1234 let column_descs = vec![
1235 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1236 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1237 ];
1238
1239 let table = BatchTable::for_test(
1240 memory_state_store.clone(),
1241 table_id,
1242 column_descs,
1243 order_types,
1244 vec![0],
1245 vec![0, 1],
1246 );
1247
1248 let mut materialize_executor = MaterializeExecutor::for_test(
1249 source,
1250 memory_state_store,
1251 table_id,
1252 vec![ColumnOrder::new(0, OrderType::ascending())],
1253 column_ids,
1254 Arc::new(AtomicU64::new(0)),
1255 ConflictBehavior::Overwrite,
1256 )
1257 .await
1258 .boxed()
1259 .execute();
1260 materialize_executor.next().await.transpose().unwrap();
1261
1262 materialize_executor.next().await.transpose().unwrap();
1263 materialize_executor.next().await.transpose().unwrap();
1264
1265 match materialize_executor.next().await.transpose().unwrap() {
1267 Some(Message::Barrier(_)) => {
1268 let row = table
1269 .get_row(
1270 &OwnedRow::new(vec![Some(3_i32.into())]),
1271 HummockReadEpoch::NoWait(u64::MAX),
1272 )
1273 .await
1274 .unwrap();
1275 assert_eq!(
1276 row,
1277 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1278 );
1279
1280 let row = table
1281 .get_row(
1282 &OwnedRow::new(vec![Some(1_i32.into())]),
1283 HummockReadEpoch::NoWait(u64::MAX),
1284 )
1285 .await
1286 .unwrap();
1287 assert_eq!(
1288 row,
1289 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1290 );
1291
1292 let row = table
1293 .get_row(
1294 &OwnedRow::new(vec![Some(2_i32.into())]),
1295 HummockReadEpoch::NoWait(u64::MAX),
1296 )
1297 .await
1298 .unwrap();
1299 assert_eq!(
1300 row,
1301 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1302 );
1303 }
1304 _ => unreachable!(),
1305 }
1306 }
1307
1308 #[tokio::test]
1309 async fn test_delete_and_update_conflict() {
1310 let memory_state_store = MemoryStateStore::new();
1312 let table_id = TableId::new(1);
1313 let schema = Schema::new(vec![
1315 Field::unnamed(DataType::Int32),
1316 Field::unnamed(DataType::Int32),
1317 ]);
1318 let column_ids = vec![0.into(), 1.into()];
1319
1320 let chunk1 = StreamChunk::from_pretty(
1322 " i i
1323 + 1 4
1324 + 2 5
1325 + 3 6
1326 U- 8 1
1327 U+ 8 2
1328 + 8 3",
1329 );
1330
1331 let chunk2 = StreamChunk::from_pretty(
1333 " i i
1334 + 7 8
1335 - 3 4
1336 - 5 0",
1337 );
1338
1339 let chunk3 = StreamChunk::from_pretty(
1341 " i i
1342 + 1 5
1343 U- 2 4
1344 U+ 2 8
1345 U- 9 0
1346 U+ 9 1",
1347 );
1348
1349 let source = MockSource::with_messages(vec![
1351 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1352 Message::Chunk(chunk1),
1353 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1354 Message::Chunk(chunk2),
1355 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1356 Message::Chunk(chunk3),
1357 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1358 ])
1359 .into_executor(schema.clone(), PkIndices::new());
1360
1361 let order_types = vec![OrderType::ascending()];
1362 let column_descs = vec![
1363 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1364 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1365 ];
1366
1367 let table = BatchTable::for_test(
1368 memory_state_store.clone(),
1369 table_id,
1370 column_descs,
1371 order_types,
1372 vec![0],
1373 vec![0, 1],
1374 );
1375
1376 let mut materialize_executor = MaterializeExecutor::for_test(
1377 source,
1378 memory_state_store,
1379 table_id,
1380 vec![ColumnOrder::new(0, OrderType::ascending())],
1381 column_ids,
1382 Arc::new(AtomicU64::new(0)),
1383 ConflictBehavior::Overwrite,
1384 )
1385 .await
1386 .boxed()
1387 .execute();
1388 materialize_executor.next().await.transpose().unwrap();
1389
1390 materialize_executor.next().await.transpose().unwrap();
1391
1392 match materialize_executor.next().await.transpose().unwrap() {
1394 Some(Message::Barrier(_)) => {
1395 let row = table
1397 .get_row(
1398 &OwnedRow::new(vec![Some(8_i32.into())]),
1399 HummockReadEpoch::NoWait(u64::MAX),
1400 )
1401 .await
1402 .unwrap();
1403 assert_eq!(
1404 row,
1405 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1406 );
1407 }
1408 _ => unreachable!(),
1409 }
1410 materialize_executor.next().await.transpose().unwrap();
1411
1412 match materialize_executor.next().await.transpose().unwrap() {
1413 Some(Message::Barrier(_)) => {
1414 let row = table
1415 .get_row(
1416 &OwnedRow::new(vec![Some(7_i32.into())]),
1417 HummockReadEpoch::NoWait(u64::MAX),
1418 )
1419 .await
1420 .unwrap();
1421 assert_eq!(
1422 row,
1423 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1424 );
1425
1426 let row = table
1428 .get_row(
1429 &OwnedRow::new(vec![Some(3_i32.into())]),
1430 HummockReadEpoch::NoWait(u64::MAX),
1431 )
1432 .await
1433 .unwrap();
1434 assert_eq!(row, None);
1435
1436 let row = table
1438 .get_row(
1439 &OwnedRow::new(vec![Some(5_i32.into())]),
1440 HummockReadEpoch::NoWait(u64::MAX),
1441 )
1442 .await
1443 .unwrap();
1444 assert_eq!(row, None);
1445 }
1446 _ => unreachable!(),
1447 }
1448
1449 materialize_executor.next().await.transpose().unwrap();
1450 match materialize_executor.next().await.transpose().unwrap() {
1452 Some(Message::Barrier(_)) => {
1453 let row = table
1454 .get_row(
1455 &OwnedRow::new(vec![Some(1_i32.into())]),
1456 HummockReadEpoch::NoWait(u64::MAX),
1457 )
1458 .await
1459 .unwrap();
1460 assert_eq!(
1461 row,
1462 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1463 );
1464
1465 let row = table
1467 .get_row(
1468 &OwnedRow::new(vec![Some(2_i32.into())]),
1469 HummockReadEpoch::NoWait(u64::MAX),
1470 )
1471 .await
1472 .unwrap();
1473 assert_eq!(
1474 row,
1475 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1476 );
1477
1478 let row = table
1480 .get_row(
1481 &OwnedRow::new(vec![Some(9_i32.into())]),
1482 HummockReadEpoch::NoWait(u64::MAX),
1483 )
1484 .await
1485 .unwrap();
1486 assert_eq!(
1487 row,
1488 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1489 );
1490 }
1491 _ => unreachable!(),
1492 }
1493 }
1494
1495 #[tokio::test]
1496 async fn test_ignore_insert_conflict() {
1497 let memory_state_store = MemoryStateStore::new();
1499 let table_id = TableId::new(1);
1500 let schema = Schema::new(vec![
1502 Field::unnamed(DataType::Int32),
1503 Field::unnamed(DataType::Int32),
1504 ]);
1505 let column_ids = vec![0.into(), 1.into()];
1506
1507 let chunk1 = StreamChunk::from_pretty(
1509 " i i
1510 + 1 3
1511 + 1 4
1512 + 2 5
1513 + 3 6",
1514 );
1515
1516 let chunk2 = StreamChunk::from_pretty(
1517 " i i
1518 + 1 5
1519 + 2 6",
1520 );
1521
1522 let chunk3 = StreamChunk::from_pretty(
1524 " i i
1525 + 1 6",
1526 );
1527
1528 let source = MockSource::with_messages(vec![
1530 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1531 Message::Chunk(chunk1),
1532 Message::Chunk(chunk2),
1533 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1534 Message::Chunk(chunk3),
1535 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1536 ])
1537 .into_executor(schema.clone(), PkIndices::new());
1538
1539 let order_types = vec![OrderType::ascending()];
1540 let column_descs = vec![
1541 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1542 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1543 ];
1544
1545 let table = BatchTable::for_test(
1546 memory_state_store.clone(),
1547 table_id,
1548 column_descs,
1549 order_types,
1550 vec![0],
1551 vec![0, 1],
1552 );
1553
1554 let mut materialize_executor = MaterializeExecutor::for_test(
1555 source,
1556 memory_state_store,
1557 table_id,
1558 vec![ColumnOrder::new(0, OrderType::ascending())],
1559 column_ids,
1560 Arc::new(AtomicU64::new(0)),
1561 ConflictBehavior::IgnoreConflict,
1562 )
1563 .await
1564 .boxed()
1565 .execute();
1566 materialize_executor.next().await.transpose().unwrap();
1567
1568 materialize_executor.next().await.transpose().unwrap();
1569 materialize_executor.next().await.transpose().unwrap();
1570
1571 match materialize_executor.next().await.transpose().unwrap() {
1573 Some(Message::Barrier(_)) => {
1574 let row = table
1575 .get_row(
1576 &OwnedRow::new(vec![Some(3_i32.into())]),
1577 HummockReadEpoch::NoWait(u64::MAX),
1578 )
1579 .await
1580 .unwrap();
1581 assert_eq!(
1582 row,
1583 Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1584 );
1585
1586 let row = table
1587 .get_row(
1588 &OwnedRow::new(vec![Some(1_i32.into())]),
1589 HummockReadEpoch::NoWait(u64::MAX),
1590 )
1591 .await
1592 .unwrap();
1593 assert_eq!(
1594 row,
1595 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1596 );
1597
1598 let row = table
1599 .get_row(
1600 &OwnedRow::new(vec![Some(2_i32.into())]),
1601 HummockReadEpoch::NoWait(u64::MAX),
1602 )
1603 .await
1604 .unwrap();
1605 assert_eq!(
1606 row,
1607 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1608 );
1609 }
1610 _ => unreachable!(),
1611 }
1612 }
1613
1614 #[tokio::test]
1615 async fn test_ignore_delete_then_insert() {
1616 let memory_state_store = MemoryStateStore::new();
1618 let table_id = TableId::new(1);
1619 let schema = Schema::new(vec![
1621 Field::unnamed(DataType::Int32),
1622 Field::unnamed(DataType::Int32),
1623 ]);
1624 let column_ids = vec![0.into(), 1.into()];
1625
1626 let chunk1 = StreamChunk::from_pretty(
1628 " i i
1629 + 1 3
1630 - 1 3
1631 + 1 6",
1632 );
1633
1634 let source = MockSource::with_messages(vec![
1636 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1637 Message::Chunk(chunk1),
1638 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1639 ])
1640 .into_executor(schema.clone(), PkIndices::new());
1641
1642 let order_types = vec![OrderType::ascending()];
1643 let column_descs = vec![
1644 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1645 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1646 ];
1647
1648 let table = BatchTable::for_test(
1649 memory_state_store.clone(),
1650 table_id,
1651 column_descs,
1652 order_types,
1653 vec![0],
1654 vec![0, 1],
1655 );
1656
1657 let mut materialize_executor = MaterializeExecutor::for_test(
1658 source,
1659 memory_state_store,
1660 table_id,
1661 vec![ColumnOrder::new(0, OrderType::ascending())],
1662 column_ids,
1663 Arc::new(AtomicU64::new(0)),
1664 ConflictBehavior::IgnoreConflict,
1665 )
1666 .await
1667 .boxed()
1668 .execute();
1669 let _msg1 = materialize_executor
1670 .next()
1671 .await
1672 .transpose()
1673 .unwrap()
1674 .unwrap()
1675 .as_barrier()
1676 .unwrap();
1677 let _msg2 = materialize_executor
1678 .next()
1679 .await
1680 .transpose()
1681 .unwrap()
1682 .unwrap()
1683 .as_chunk()
1684 .unwrap();
1685 let _msg3 = materialize_executor
1686 .next()
1687 .await
1688 .transpose()
1689 .unwrap()
1690 .unwrap()
1691 .as_barrier()
1692 .unwrap();
1693
1694 let row = table
1695 .get_row(
1696 &OwnedRow::new(vec![Some(1_i32.into())]),
1697 HummockReadEpoch::NoWait(u64::MAX),
1698 )
1699 .await
1700 .unwrap();
1701 assert_eq!(
1702 row,
1703 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1704 );
1705 }
1706
1707 #[tokio::test]
1708 async fn test_ignore_delete_and_update_conflict() {
1709 let memory_state_store = MemoryStateStore::new();
1711 let table_id = TableId::new(1);
1712 let schema = Schema::new(vec![
1714 Field::unnamed(DataType::Int32),
1715 Field::unnamed(DataType::Int32),
1716 ]);
1717 let column_ids = vec![0.into(), 1.into()];
1718
1719 let chunk1 = StreamChunk::from_pretty(
1721 " i i
1722 + 1 4
1723 + 2 5
1724 + 3 6
1725 U- 8 1
1726 U+ 8 2
1727 + 8 3",
1728 );
1729
1730 let chunk2 = StreamChunk::from_pretty(
1732 " i i
1733 + 7 8
1734 - 3 4
1735 - 5 0",
1736 );
1737
1738 let chunk3 = StreamChunk::from_pretty(
1740 " i i
1741 + 1 5
1742 U- 2 4
1743 U+ 2 8
1744 U- 9 0
1745 U+ 9 1",
1746 );
1747
1748 let source = MockSource::with_messages(vec![
1750 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1751 Message::Chunk(chunk1),
1752 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1753 Message::Chunk(chunk2),
1754 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1755 Message::Chunk(chunk3),
1756 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1757 ])
1758 .into_executor(schema.clone(), PkIndices::new());
1759
1760 let order_types = vec![OrderType::ascending()];
1761 let column_descs = vec![
1762 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1763 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1764 ];
1765
1766 let table = BatchTable::for_test(
1767 memory_state_store.clone(),
1768 table_id,
1769 column_descs,
1770 order_types,
1771 vec![0],
1772 vec![0, 1],
1773 );
1774
1775 let mut materialize_executor = MaterializeExecutor::for_test(
1776 source,
1777 memory_state_store,
1778 table_id,
1779 vec![ColumnOrder::new(0, OrderType::ascending())],
1780 column_ids,
1781 Arc::new(AtomicU64::new(0)),
1782 ConflictBehavior::IgnoreConflict,
1783 )
1784 .await
1785 .boxed()
1786 .execute();
1787 materialize_executor.next().await.transpose().unwrap();
1788
1789 materialize_executor.next().await.transpose().unwrap();
1790
1791 match materialize_executor.next().await.transpose().unwrap() {
1793 Some(Message::Barrier(_)) => {
1794 let row = table
1796 .get_row(
1797 &OwnedRow::new(vec![Some(8_i32.into())]),
1798 HummockReadEpoch::NoWait(u64::MAX),
1799 )
1800 .await
1801 .unwrap();
1802 assert_eq!(
1803 row,
1804 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1805 );
1806 }
1807 _ => unreachable!(),
1808 }
1809 materialize_executor.next().await.transpose().unwrap();
1810
1811 match materialize_executor.next().await.transpose().unwrap() {
1812 Some(Message::Barrier(_)) => {
1813 let row = table
1814 .get_row(
1815 &OwnedRow::new(vec![Some(7_i32.into())]),
1816 HummockReadEpoch::NoWait(u64::MAX),
1817 )
1818 .await
1819 .unwrap();
1820 assert_eq!(
1821 row,
1822 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1823 );
1824
1825 let row = table
1827 .get_row(
1828 &OwnedRow::new(vec![Some(3_i32.into())]),
1829 HummockReadEpoch::NoWait(u64::MAX),
1830 )
1831 .await
1832 .unwrap();
1833 assert_eq!(row, None);
1834
1835 let row = table
1837 .get_row(
1838 &OwnedRow::new(vec![Some(5_i32.into())]),
1839 HummockReadEpoch::NoWait(u64::MAX),
1840 )
1841 .await
1842 .unwrap();
1843 assert_eq!(row, None);
1844 }
1845 _ => unreachable!(),
1846 }
1847
1848 materialize_executor.next().await.transpose().unwrap();
1849 match materialize_executor.next().await.transpose().unwrap() {
1852 Some(Message::Barrier(_)) => {
1853 let row = table
1854 .get_row(
1855 &OwnedRow::new(vec![Some(1_i32.into())]),
1856 HummockReadEpoch::NoWait(u64::MAX),
1857 )
1858 .await
1859 .unwrap();
1860 assert_eq!(
1861 row,
1862 Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1863 );
1864
1865 let row = table
1867 .get_row(
1868 &OwnedRow::new(vec![Some(2_i32.into())]),
1869 HummockReadEpoch::NoWait(u64::MAX),
1870 )
1871 .await
1872 .unwrap();
1873 assert_eq!(
1874 row,
1875 Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1876 );
1877
1878 let row = table
1880 .get_row(
1881 &OwnedRow::new(vec![Some(9_i32.into())]),
1882 HummockReadEpoch::NoWait(u64::MAX),
1883 )
1884 .await
1885 .unwrap();
1886 assert_eq!(
1887 row,
1888 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1889 );
1890 }
1891 _ => unreachable!(),
1892 }
1893 }
1894
1895 #[tokio::test]
1896 async fn test_do_update_if_not_null_conflict() {
1897 let memory_state_store = MemoryStateStore::new();
1899 let table_id = TableId::new(1);
1900 let schema = Schema::new(vec![
1902 Field::unnamed(DataType::Int32),
1903 Field::unnamed(DataType::Int32),
1904 ]);
1905 let column_ids = vec![0.into(), 1.into()];
1906
1907 let chunk1 = StreamChunk::from_pretty(
1909 " i i
1910 + 1 4
1911 + 2 .
1912 + 3 6
1913 U- 8 .
1914 U+ 8 2
1915 + 8 .",
1916 );
1917
1918 let chunk2 = StreamChunk::from_pretty(
1920 " i i
1921 + 7 8
1922 - 3 4
1923 - 5 0",
1924 );
1925
1926 let chunk3 = StreamChunk::from_pretty(
1928 " i i
1929 + 1 5
1930 + 7 .
1931 U- 2 4
1932 U+ 2 .
1933 U- 9 0
1934 U+ 9 1",
1935 );
1936
1937 let source = MockSource::with_messages(vec![
1939 Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1940 Message::Chunk(chunk1),
1941 Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1942 Message::Chunk(chunk2),
1943 Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1944 Message::Chunk(chunk3),
1945 Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1946 ])
1947 .into_executor(schema.clone(), PkIndices::new());
1948
1949 let order_types = vec![OrderType::ascending()];
1950 let column_descs = vec![
1951 ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1952 ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1953 ];
1954
1955 let table = BatchTable::for_test(
1956 memory_state_store.clone(),
1957 table_id,
1958 column_descs,
1959 order_types,
1960 vec![0],
1961 vec![0, 1],
1962 );
1963
1964 let mut materialize_executor = MaterializeExecutor::for_test(
1965 source,
1966 memory_state_store,
1967 table_id,
1968 vec![ColumnOrder::new(0, OrderType::ascending())],
1969 column_ids,
1970 Arc::new(AtomicU64::new(0)),
1971 ConflictBehavior::DoUpdateIfNotNull,
1972 )
1973 .await
1974 .boxed()
1975 .execute();
1976 materialize_executor.next().await.transpose().unwrap();
1977
1978 materialize_executor.next().await.transpose().unwrap();
1979
1980 match materialize_executor.next().await.transpose().unwrap() {
1982 Some(Message::Barrier(_)) => {
1983 let row = table
1984 .get_row(
1985 &OwnedRow::new(vec![Some(8_i32.into())]),
1986 HummockReadEpoch::NoWait(u64::MAX),
1987 )
1988 .await
1989 .unwrap();
1990 assert_eq!(
1991 row,
1992 Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1993 );
1994
1995 let row = table
1996 .get_row(
1997 &OwnedRow::new(vec![Some(2_i32.into())]),
1998 HummockReadEpoch::NoWait(u64::MAX),
1999 )
2000 .await
2001 .unwrap();
2002 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2003 }
2004 _ => unreachable!(),
2005 }
2006 materialize_executor.next().await.transpose().unwrap();
2007
2008 match materialize_executor.next().await.transpose().unwrap() {
2009 Some(Message::Barrier(_)) => {
2010 let row = table
2011 .get_row(
2012 &OwnedRow::new(vec![Some(7_i32.into())]),
2013 HummockReadEpoch::NoWait(u64::MAX),
2014 )
2015 .await
2016 .unwrap();
2017 assert_eq!(
2018 row,
2019 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2020 );
2021
2022 let row = table
2024 .get_row(
2025 &OwnedRow::new(vec![Some(3_i32.into())]),
2026 HummockReadEpoch::NoWait(u64::MAX),
2027 )
2028 .await
2029 .unwrap();
2030 assert_eq!(row, None);
2031
2032 let row = table
2034 .get_row(
2035 &OwnedRow::new(vec![Some(5_i32.into())]),
2036 HummockReadEpoch::NoWait(u64::MAX),
2037 )
2038 .await
2039 .unwrap();
2040 assert_eq!(row, None);
2041 }
2042 _ => unreachable!(),
2043 }
2044
2045 materialize_executor.next().await.transpose().unwrap();
2046 match materialize_executor.next().await.transpose().unwrap() {
2049 Some(Message::Barrier(_)) => {
2050 let row = table
2051 .get_row(
2052 &OwnedRow::new(vec![Some(7_i32.into())]),
2053 HummockReadEpoch::NoWait(u64::MAX),
2054 )
2055 .await
2056 .unwrap();
2057 assert_eq!(
2058 row,
2059 Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2060 );
2061
2062 let row = table
2064 .get_row(
2065 &OwnedRow::new(vec![Some(2_i32.into())]),
2066 HummockReadEpoch::NoWait(u64::MAX),
2067 )
2068 .await
2069 .unwrap();
2070 assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2071
2072 let row = table
2074 .get_row(
2075 &OwnedRow::new(vec![Some(9_i32.into())]),
2076 HummockReadEpoch::NoWait(u64::MAX),
2077 )
2078 .await
2079 .unwrap();
2080 assert_eq!(
2081 row,
2082 Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2083 );
2084 }
2085 _ => unreachable!(),
2086 }
2087 }
2088
2089 fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2090 const KN: u32 = 4;
2091 const SEED: u64 = 998244353;
2092 let mut ret = vec![];
2093 let mut builder =
2094 StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2095 let mut rng = SmallRng::seed_from_u64(SEED);
2096
2097 let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2098 let len = c.data_chunk().capacity();
2099 let mut c = StreamChunkMut::from(c);
2100 for i in 0..len {
2101 c.set_vis(i, rng.random_bool(0.5));
2102 }
2103 c.into()
2104 };
2105 for _ in 0..row_number {
2106 let k = (rng.next_u32() % KN) as i32;
2107 let v = rng.next_u32() as i32;
2108 let op = if rng.random_bool(0.5) {
2109 Op::Insert
2110 } else {
2111 Op::Delete
2112 };
2113 if let Some(c) =
2114 builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2115 {
2116 ret.push(random_vis(c, &mut rng));
2117 }
2118 }
2119 if let Some(c) = builder.take() {
2120 ret.push(random_vis(c, &mut rng));
2121 }
2122 ret
2123 }
2124
2125 async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2126 const N: usize = 100000;
2127
2128 let memory_state_store = MemoryStateStore::new();
2130 let table_id = TableId::new(1);
2131 let schema = Schema::new(vec![
2133 Field::unnamed(DataType::Int32),
2134 Field::unnamed(DataType::Int32),
2135 ]);
2136 let column_ids = vec![0.into(), 1.into()];
2137
2138 let chunks = gen_fuzz_data(N, 128);
2139 let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2140 .chain(chunks.into_iter().map(Message::Chunk))
2141 .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2142 test_epoch(2),
2143 ))))
2144 .collect();
2145 let source =
2147 MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2148
2149 let mut materialize_executor = MaterializeExecutor::for_test(
2150 source,
2151 memory_state_store.clone(),
2152 table_id,
2153 vec![ColumnOrder::new(0, OrderType::ascending())],
2154 column_ids,
2155 Arc::new(AtomicU64::new(0)),
2156 conflict_behavior,
2157 )
2158 .await
2159 .boxed()
2160 .execute();
2161 materialize_executor.expect_barrier().await;
2162
2163 let order_types = vec![OrderType::ascending()];
2164 let column_descs = vec![
2165 ColumnDesc::unnamed(0.into(), DataType::Int32),
2166 ColumnDesc::unnamed(1.into(), DataType::Int32),
2167 ];
2168 let pk_indices = vec![0];
2169
2170 let mut table = StateTable::from_table_catalog(
2171 &crate::common::table::test_utils::gen_pbtable(
2172 TableId::from(1002),
2173 column_descs.clone(),
2174 order_types,
2175 pk_indices,
2176 0,
2177 ),
2178 memory_state_store.clone(),
2179 None,
2180 )
2181 .await;
2182
2183 while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2184 table.write_chunk(c);
2186 }
2187 }
2188
2189 #[tokio::test]
2190 async fn fuzz_test_stream_consistent_upsert() {
2191 fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2192 }
2193
2194 #[tokio::test]
2195 async fn fuzz_test_stream_consistent_ignore() {
2196 fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2197 }
2198}