risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, OwnedRow};
30use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
33use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
34use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
35use risingwave_pb::catalog::Table;
36use risingwave_pb::catalog::table::Engine;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{
42    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
43};
44use crate::executor::monitor::MaterializeMetrics;
45use crate::executor::prelude::*;
46
47/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
48pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
49    input: Executor,
50
51    schema: Schema,
52
53    state_table: StateTableInner<S, SD>,
54
55    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
56    arrange_key_indices: Vec<usize>,
57
58    actor_context: ActorContextRef,
59
60    materialize_cache: MaterializeCache<SD>,
61
62    conflict_behavior: ConflictBehavior,
63
64    version_column_indices: Vec<u32>,
65
66    may_have_downstream: bool,
67
68    depended_subscription_ids: HashSet<u32>,
69
70    metrics: MaterializeMetrics,
71
72    /// No data will be written to hummock table. This Materialize is just a dummy node.
73    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
74    is_dummy_table: bool,
75
76    /// Indices of TOAST-able columns for PostgreSQL CDC tables. None means either non-CDC table or CDC table without TOAST-able columns.
77    toastable_column_indices: Option<Vec<usize>>,
78}
79
80fn get_op_consistency_level(
81    conflict_behavior: ConflictBehavior,
82    may_have_downstream: bool,
83    depended_subscriptions: &HashSet<u32>,
84) -> StateTableOpConsistencyLevel {
85    if !depended_subscriptions.is_empty() {
86        StateTableOpConsistencyLevel::LogStoreEnabled
87    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
88        // Table with overwrite conflict behavior could disable conflict check
89        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
90        StateTableOpConsistencyLevel::Inconsistent
91    } else {
92        StateTableOpConsistencyLevel::ConsistentOldValue
93    }
94}
95
96impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
97    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
98    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
99    /// should be `None`.
100    #[allow(clippy::too_many_arguments)]
101    pub async fn new(
102        input: Executor,
103        schema: Schema,
104        store: S,
105        arrange_key: Vec<ColumnOrder>,
106        actor_context: ActorContextRef,
107        vnodes: Option<Arc<Bitmap>>,
108        table_catalog: &Table,
109        watermark_epoch: AtomicU64Ref,
110        conflict_behavior: ConflictBehavior,
111        version_column_indices: Vec<u32>,
112        metrics: Arc<StreamingMetrics>,
113    ) -> Self {
114        let table_columns: Vec<ColumnDesc> = table_catalog
115            .columns
116            .iter()
117            .map(|col| col.column_desc.as_ref().unwrap().into())
118            .collect();
119
120        // Extract TOAST-able column indices from table columns.
121        // Only for PostgreSQL CDC tables.
122        let toastable_column_indices = if table_catalog.cdc_table_type()
123            == risingwave_pb::catalog::table::CdcTableType::Postgres
124        {
125            let toastable_indices: Vec<usize> = table_columns
126                .iter()
127                .enumerate()
128                .filter_map(|(index, column)| match &column.data_type {
129                    // Currently supports TOAST updates for:
130                    // - jsonb (DataType::Jsonb)
131                    // - varchar (DataType::Varchar)
132                    // - bytea (DataType::Bytea)
133                    // - One-dimensional arrays of the above types (DataType::List)
134                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
135
136                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
137                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
138                        Some(index)
139                    }
140                    _ => None,
141                })
142                .collect();
143
144            if toastable_indices.is_empty() {
145                None
146            } else {
147                Some(toastable_indices)
148            }
149        } else {
150            None
151        };
152
153        let row_serde: BasicSerde = BasicSerde::new(
154            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
155            Arc::from(table_columns.into_boxed_slice()),
156        );
157
158        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
159        let may_have_downstream = actor_context.initial_dispatch_num != 0;
160        let depended_subscription_ids = actor_context
161            .related_subscriptions
162            .get(&TableId::new(table_catalog.id))
163            .cloned()
164            .unwrap_or_default();
165        let op_consistency_level = get_op_consistency_level(
166            conflict_behavior,
167            may_have_downstream,
168            &depended_subscription_ids,
169        );
170        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
171        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
172            .with_op_consistency_level(op_consistency_level)
173            .enable_preload_all_rows_by_config(&actor_context.streaming_config)
174            .build()
175            .await;
176
177        let mv_metrics = metrics.new_materialize_metrics(
178            TableId::new(table_catalog.id),
179            actor_context.id,
180            actor_context.fragment_id,
181        );
182
183        let metrics_info =
184            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
185
186        let is_dummy_table =
187            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
188
189        Self {
190            input,
191            schema,
192            state_table,
193            arrange_key_indices,
194            actor_context,
195            materialize_cache: MaterializeCache::new(
196                watermark_epoch,
197                metrics_info,
198                row_serde,
199                version_column_indices.clone(),
200            ),
201            conflict_behavior,
202            version_column_indices,
203            is_dummy_table,
204            may_have_downstream,
205            depended_subscription_ids,
206            metrics: mv_metrics,
207            toastable_column_indices,
208        }
209    }
210
211    #[try_stream(ok = Message, error = StreamExecutorError)]
212    async fn execute_inner(mut self) {
213        let mv_table_id = TableId::new(self.state_table.table_id());
214        let data_types = self.schema.data_types();
215        let mut input = self.input.execute();
216
217        let barrier = expect_first_barrier(&mut input).await?;
218        let first_epoch = barrier.epoch;
219        // The first barrier message should be propagated.
220        yield Message::Barrier(barrier);
221        self.state_table.init_epoch(first_epoch).await?;
222
223        #[for_await]
224        for msg in input {
225            let msg = msg?;
226            self.materialize_cache.evict();
227
228            let msg = match msg {
229                Message::Watermark(w) => Message::Watermark(w),
230                Message::Chunk(chunk) if self.is_dummy_table => {
231                    self.metrics
232                        .materialize_input_row_count
233                        .inc_by(chunk.cardinality() as u64);
234                    Message::Chunk(chunk)
235                }
236                Message::Chunk(chunk) => {
237                    self.metrics
238                        .materialize_input_row_count
239                        .inc_by(chunk.cardinality() as u64);
240
241                    // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
242                    // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
243                    // and the conflict behavior is overwrite.
244                    let do_not_handle_conflict = !self.state_table.is_consistent_op()
245                        && self.version_column_indices.is_empty()
246                        && self.conflict_behavior == ConflictBehavior::Overwrite;
247
248                    match self.conflict_behavior {
249                        checked_conflict_behaviors!() if !do_not_handle_conflict => {
250                            if chunk.cardinality() == 0 {
251                                // empty chunk
252                                continue;
253                            }
254                            let (data_chunk, ops) = chunk.into_parts();
255
256                            if self.state_table.value_indices().is_some() {
257                                // TODO(st1page): when materialize partial columns(), we should
258                                // construct some columns in the pk
259                                panic!(
260                                    "materialize executor with data check can not handle only materialize partial columns"
261                                )
262                            };
263                            let values = data_chunk.serialize();
264
265                            let key_chunk = data_chunk.project(self.state_table.pk_indices());
266
267                            let pks = {
268                                let mut pks = vec![vec![]; data_chunk.capacity()];
269                                key_chunk
270                                    .rows_with_holes()
271                                    .zip_eq_fast(pks.iter_mut())
272                                    .for_each(|(r, vnode_and_pk)| {
273                                        if let Some(r) = r {
274                                            self.state_table.pk_serde().serialize(r, vnode_and_pk);
275                                        }
276                                    });
277                                pks
278                            };
279                            let (_, vis) = key_chunk.into_parts();
280                            let row_ops = ops
281                                .iter()
282                                .zip_eq_debug(pks.into_iter())
283                                .zip_eq_debug(values.into_iter())
284                                .zip_eq_debug(vis.iter())
285                                .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
286                                .collect_vec();
287
288                            let change_buffer = self
289                                .materialize_cache
290                                .handle(
291                                    row_ops,
292                                    &self.state_table,
293                                    self.conflict_behavior,
294                                    &self.metrics,
295                                    self.toastable_column_indices.as_deref(),
296                                )
297                                .await?;
298
299                            match generate_output(change_buffer, data_types.clone())? {
300                                Some(output_chunk) => {
301                                    self.state_table.write_chunk(output_chunk.clone());
302                                    self.state_table.try_flush().await?;
303                                    Message::Chunk(output_chunk)
304                                }
305                                None => continue,
306                            }
307                        }
308                        ConflictBehavior::IgnoreConflict => unreachable!(),
309                        ConflictBehavior::NoCheck
310                        | ConflictBehavior::Overwrite
311                        | ConflictBehavior::DoUpdateIfNotNull => {
312                            self.state_table.write_chunk(chunk.clone());
313                            self.state_table.try_flush().await?;
314                            Message::Chunk(chunk)
315                        } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(),
316                    }
317                }
318                Message::Barrier(b) => {
319                    // If a downstream mv depends on the current table, we need to do conflict check again.
320                    if !self.may_have_downstream
321                        && b.has_more_downstream_fragments(self.actor_context.id)
322                    {
323                        self.may_have_downstream = true;
324                    }
325                    Self::may_update_depended_subscriptions(
326                        &mut self.depended_subscription_ids,
327                        &b,
328                        mv_table_id,
329                    );
330                    let op_consistency_level = get_op_consistency_level(
331                        self.conflict_behavior,
332                        self.may_have_downstream,
333                        &self.depended_subscription_ids,
334                    );
335                    let post_commit = self
336                        .state_table
337                        .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
338                        .await?;
339                    if !post_commit.inner().is_consistent_op() {
340                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
341                    }
342
343                    let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
344                    let b_epoch = b.epoch;
345                    yield Message::Barrier(b);
346
347                    // Update the vnode bitmap for the state table if asked.
348                    if let Some((_, cache_may_stale)) =
349                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
350                        && cache_may_stale
351                    {
352                        self.materialize_cache.lru_cache.clear();
353                    }
354
355                    self.metrics
356                        .materialize_current_epoch
357                        .set(b_epoch.curr as i64);
358
359                    continue;
360                }
361            };
362            yield msg;
363        }
364    }
365
366    /// return true when changed
367    fn may_update_depended_subscriptions(
368        depended_subscriptions: &mut HashSet<u32>,
369        barrier: &Barrier,
370        mv_table_id: TableId,
371    ) {
372        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
373            if !depended_subscriptions.insert(subscriber_id) {
374                warn!(
375                    ?depended_subscriptions,
376                    ?mv_table_id,
377                    subscriber_id,
378                    "subscription id already exists"
379                );
380            }
381        }
382
383        if let Some(Mutation::DropSubscriptions {
384            subscriptions_to_drop,
385        }) = barrier.mutation.as_deref()
386        {
387            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
388                if *upstream_mv_table_id == mv_table_id
389                    && !depended_subscriptions.remove(subscriber_id)
390                {
391                    warn!(
392                        ?depended_subscriptions,
393                        ?mv_table_id,
394                        subscriber_id,
395                        "drop non existing subscriber_id id"
396                    );
397                }
398            }
399        }
400    }
401}
402
403impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
404    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
405    #[cfg(any(test, feature = "test"))]
406    pub async fn for_test(
407        input: Executor,
408        store: S,
409        table_id: TableId,
410        keys: Vec<ColumnOrder>,
411        column_ids: Vec<risingwave_common::catalog::ColumnId>,
412        watermark_epoch: AtomicU64Ref,
413        conflict_behavior: ConflictBehavior,
414    ) -> Self {
415        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
416        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
417        let schema = input.schema().clone();
418        let columns: Vec<ColumnDesc> = column_ids
419            .into_iter()
420            .zip_eq_fast(schema.fields.iter())
421            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
422            .collect_vec();
423
424        let row_serde = BasicSerde::new(
425            Arc::from((0..columns.len()).collect_vec()),
426            Arc::from(columns.clone().into_boxed_slice()),
427        );
428        let state_table = StateTableInner::from_table_catalog(
429            &crate::common::table::test_utils::gen_pbtable(
430                table_id,
431                columns,
432                arrange_order_types,
433                arrange_columns.clone(),
434                0,
435            ),
436            store,
437            None,
438        )
439        .await;
440
441        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
442
443        Self {
444            input,
445            schema,
446            state_table,
447            arrange_key_indices: arrange_columns.clone(),
448            actor_context: ActorContext::for_test(0),
449            materialize_cache: MaterializeCache::new(
450                watermark_epoch,
451                MetricsInfo::for_test(),
452                row_serde,
453                vec![],
454            ),
455            conflict_behavior,
456            version_column_indices: vec![],
457            is_dummy_table: false,
458            toastable_column_indices: None,
459            may_have_downstream: true,
460            depended_subscription_ids: HashSet::new(),
461            metrics,
462        }
463    }
464}
465
466/// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
467/// Optimized by checking length first to avoid expensive string comparison.
468fn is_unavailable_value_str(s: &str) -> bool {
469    s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
470}
471
472/// Check if a datum represents Debezium's unavailable value placeholder.
473/// This function handles both scalar types and one-dimensional arrays.
474fn is_debezium_unavailable_value(
475    datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
476) -> bool {
477    match datum {
478        Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
479        Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
480            // For jsonb type, check if it's a string containing the unavailable value
481            jsonb_ref
482                .as_str()
483                .map(is_unavailable_value_str)
484                .unwrap_or(false)
485        }
486        Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
487            // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
488            // This is because when processing bytea from Debezium, we convert the base64-encoded string
489            // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
490            if let Ok(bytea_str) = std::str::from_utf8(bytea) {
491                is_unavailable_value_str(bytea_str)
492            } else {
493                false
494            }
495        }
496        Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
497            // For list type, check if it contains exactly one element with the unavailable value
498            // This is because when any element in an array triggers TOAST, Debezium treats the entire
499            // array as unchanged and sends a placeholder array with only one element
500            if list_ref.len() == 1 {
501                if let Some(Some(element)) = list_ref.get(0) {
502                    // Recursively check the array element
503                    is_debezium_unavailable_value(&Some(element))
504                } else {
505                    false
506                }
507            } else {
508                false
509            }
510        }
511        _ => false,
512    }
513}
514
515/// Fix TOAST columns by replacing unavailable values with old row values.
516fn handle_toast_columns_for_postgres_cdc(
517    old_row: &OwnedRow,
518    new_row: &OwnedRow,
519    toastable_indices: &[usize],
520) -> OwnedRow {
521    let mut fixed_row_data = new_row.as_inner().to_vec();
522
523    for &toast_idx in toastable_indices {
524        // Check if the new value is Debezium's unavailable value placeholder
525        let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
526        if is_unavailable {
527            // Replace with old row value if available
528            if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
529                fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
530            }
531        }
532    }
533
534    OwnedRow::new(fixed_row_data)
535}
536
537/// Construct output `StreamChunk` from given buffer.
538fn generate_output(
539    change_buffer: ChangeBuffer,
540    data_types: Vec<DataType>,
541) -> StreamExecutorResult<Option<StreamChunk>> {
542    // construct output chunk
543    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
544    let mut new_ops: Vec<Op> = vec![];
545    let mut new_rows: Vec<OwnedRow> = vec![];
546    for (_, row_op) in change_buffer.into_parts() {
547        match row_op {
548            ChangeBufferKeyOp::Insert(value) => {
549                new_ops.push(Op::Insert);
550                new_rows.push(value);
551            }
552            ChangeBufferKeyOp::Delete(old_value) => {
553                new_ops.push(Op::Delete);
554                new_rows.push(old_value);
555            }
556            ChangeBufferKeyOp::Update((old_value, new_value)) => {
557                // if old_value == new_value, we don't need to emit updates to downstream.
558                if old_value != new_value {
559                    new_ops.push(Op::UpdateDelete);
560                    new_ops.push(Op::UpdateInsert);
561                    new_rows.push(old_value);
562                    new_rows.push(new_value);
563                }
564            }
565        }
566    }
567    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
568
569    for row in new_rows {
570        let res = data_chunk_builder.append_one_row(row);
571        debug_assert!(res.is_none());
572    }
573
574    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
575        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
576        Ok(Some(new_stream_chunk))
577    } else {
578        Ok(None)
579    }
580}
581
582/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
583/// TODO(rc): merge with `TopNStaging`.
584struct ChangeBuffer {
585    buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
586}
587
588/// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes
589enum ChangeBufferKeyOp {
590    Insert(OwnedRow),
591    Delete(OwnedRow),
592    /// (`old_value`, `new_value`)
593    Update((OwnedRow, OwnedRow)),
594}
595
596impl ChangeBuffer {
597    fn new() -> Self {
598        Self {
599            buffer: HashMap::new(),
600        }
601    }
602
603    fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
604        let entry = self.buffer.entry(pk);
605        match entry {
606            Entry::Vacant(e) => {
607                e.insert(ChangeBufferKeyOp::Insert(value));
608            }
609            Entry::Occupied(mut e) => {
610                if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
611                    let old_val = std::mem::take(old_value);
612                    e.insert(ChangeBufferKeyOp::Update((old_val, value)));
613                } else {
614                    unreachable!();
615                }
616            }
617        }
618    }
619
620    fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
621        let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
622        match entry {
623            Entry::Vacant(e) => {
624                e.insert(ChangeBufferKeyOp::Delete(old_value));
625            }
626            Entry::Occupied(mut e) => match e.get_mut() {
627                ChangeBufferKeyOp::Insert(_) => {
628                    e.remove();
629                }
630                ChangeBufferKeyOp::Update((prev, _curr)) => {
631                    let prev = std::mem::take(prev);
632                    e.insert(ChangeBufferKeyOp::Delete(prev));
633                }
634                ChangeBufferKeyOp::Delete(_) => {
635                    unreachable!();
636                }
637            },
638        }
639    }
640
641    fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
642        let entry = self.buffer.entry(pk);
643        match entry {
644            Entry::Vacant(e) => {
645                e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
646            }
647            Entry::Occupied(mut e) => match e.get_mut() {
648                ChangeBufferKeyOp::Insert(_) => {
649                    e.insert(ChangeBufferKeyOp::Insert(new_value));
650                }
651                ChangeBufferKeyOp::Update((_prev, curr)) => {
652                    *curr = new_value;
653                }
654                ChangeBufferKeyOp::Delete(_) => {
655                    unreachable!()
656                }
657            },
658        }
659    }
660
661    fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
662        self.buffer
663    }
664}
665impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
666    fn execute(self: Box<Self>) -> BoxedMessageStream {
667        self.execute_inner().boxed()
668    }
669}
670
671impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
672    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673        f.debug_struct("MaterializeExecutor")
674            .field("arrange_key_indices", &self.arrange_key_indices)
675            .finish()
676    }
677}
678
679/// A cache for materialize executors.
680struct MaterializeCache<SD> {
681    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
682    row_serde: BasicSerde,
683    version_column_indices: Vec<u32>,
684    _serde: PhantomData<SD>,
685}
686
687type CacheValue = Option<CompactedRow>;
688
689impl<SD: ValueRowSerde> MaterializeCache<SD> {
690    fn new(
691        watermark_sequence: AtomicU64Ref,
692        metrics_info: MetricsInfo,
693        row_serde: BasicSerde,
694        version_column_indices: Vec<u32>,
695    ) -> Self {
696        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
697            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
698        Self {
699            lru_cache,
700            row_serde,
701            version_column_indices,
702            _serde: PhantomData,
703        }
704    }
705
706    async fn handle<S: StateStore>(
707        &mut self,
708        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
709        table: &StateTableInner<S, SD>,
710        conflict_behavior: ConflictBehavior,
711        metrics: &MaterializeMetrics,
712        toastable_column_indices: Option<&[usize]>,
713    ) -> StreamExecutorResult<ChangeBuffer> {
714        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
715
716        let key_set: HashSet<Box<[u8]>> = row_ops
717            .iter()
718            .map(|(_, k, _)| k.as_slice().into())
719            .collect();
720
721        // Populate the LRU cache with the keys in input chunk.
722        // For new keys, row values are set to None.
723        self.fetch_keys(
724            key_set.iter().map(|v| v.deref()),
725            table,
726            conflict_behavior,
727            metrics,
728        )
729        .await?;
730
731        let mut change_buffer = ChangeBuffer::new();
732        let row_serde = self.row_serde.clone();
733        let version_column_indices = self.version_column_indices.clone();
734        for (op, key, row) in row_ops {
735            match op {
736                Op::Insert | Op::UpdateInsert => {
737                    let Some(old_row) = self.get_expected(&key) else {
738                        // not exists before, meaning no conflict, simply insert
739                        let new_row_deserialized =
740                            row_serde.deserializer.deserialize(row.clone())?;
741                        change_buffer.insert(key.clone(), new_row_deserialized);
742                        self.lru_cache.put(key, Some(CompactedRow { row }));
743                        continue;
744                    };
745
746                    // now conflict happens, handle it according to the specified behavior
747                    match conflict_behavior {
748                        ConflictBehavior::Overwrite => {
749                            let old_row_deserialized =
750                                row_serde.deserializer.deserialize(old_row.row.clone())?;
751                            let new_row_deserialized =
752                                row_serde.deserializer.deserialize(row.clone())?;
753
754                            let need_overwrite = if !version_column_indices.is_empty() {
755                                versions_are_newer_or_equal(
756                                    &old_row_deserialized,
757                                    &new_row_deserialized,
758                                    &version_column_indices,
759                                )
760                            } else {
761                                // no version column specified, just overwrite
762                                true
763                            };
764
765                            if need_overwrite {
766                                if let Some(toastable_indices) = toastable_column_indices {
767                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
768                                    let final_row = handle_toast_columns_for_postgres_cdc(
769                                        &old_row_deserialized,
770                                        &new_row_deserialized,
771                                        toastable_indices,
772                                    );
773
774                                    change_buffer.update(
775                                        key.clone(),
776                                        old_row_deserialized,
777                                        final_row.clone(),
778                                    );
779                                    let final_row_bytes =
780                                        Bytes::from(row_serde.serializer.serialize(final_row));
781                                    self.lru_cache.put(
782                                        key.clone(),
783                                        Some(CompactedRow {
784                                            row: final_row_bytes,
785                                        }),
786                                    );
787                                } else {
788                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
789                                    change_buffer.update(
790                                        key.clone(),
791                                        old_row_deserialized,
792                                        new_row_deserialized,
793                                    );
794                                    self.lru_cache
795                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
796                                }
797                            };
798                        }
799                        ConflictBehavior::IgnoreConflict => {
800                            // ignore conflict, do nothing
801                        }
802                        ConflictBehavior::DoUpdateIfNotNull => {
803                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
804
805                            let old_row_deserialized =
806                                row_serde.deserializer.deserialize(old_row.row.clone())?;
807                            let new_row_deserialized =
808                                row_serde.deserializer.deserialize(row.clone())?;
809                            let need_overwrite = if !version_column_indices.is_empty() {
810                                versions_are_newer_or_equal(
811                                    &old_row_deserialized,
812                                    &new_row_deserialized,
813                                    &version_column_indices,
814                                )
815                            } else {
816                                true
817                            };
818
819                            if need_overwrite {
820                                let mut row_deserialized_vec =
821                                    old_row_deserialized.clone().into_inner().into_vec();
822                                replace_if_not_null(
823                                    &mut row_deserialized_vec,
824                                    new_row_deserialized.clone(),
825                                );
826                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
827
828                                // Apply TOAST column fix for CDC tables with TOAST columns
829                                if let Some(toastable_indices) = toastable_column_indices {
830                                    // Note: we need to use old_row_deserialized again, but it was moved above
831                                    // So we re-deserialize the old row
832                                    let old_row_deserialized_again =
833                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
834                                    updated_row = handle_toast_columns_for_postgres_cdc(
835                                        &old_row_deserialized_again,
836                                        &updated_row,
837                                        toastable_indices,
838                                    );
839                                }
840
841                                change_buffer.update(
842                                    key.clone(),
843                                    old_row_deserialized,
844                                    updated_row.clone(),
845                                );
846                                let updated_row_bytes =
847                                    Bytes::from(row_serde.serializer.serialize(updated_row));
848                                self.lru_cache.put(
849                                    key.clone(),
850                                    Some(CompactedRow {
851                                        row: updated_row_bytes,
852                                    }),
853                                );
854                            }
855                        }
856                        _ => unreachable!(),
857                    };
858                }
859
860                Op::Delete | Op::UpdateDelete => {
861                    match conflict_behavior {
862                        checked_conflict_behaviors!() => {
863                            if let Some(old_row) = self.get_expected(&key) {
864                                let old_row_deserialized =
865                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
866                                change_buffer.delete(key.clone(), old_row_deserialized);
867                                // put a None into the cache to represent deletion
868                                self.lru_cache.put(key, None);
869                            } else {
870                                // delete a non-existent value
871                                // this is allowed in the case of mview conflict, so ignore
872                            };
873                        }
874                        _ => unreachable!(),
875                    };
876                }
877            }
878        }
879        Ok(change_buffer)
880    }
881
882    async fn fetch_keys<'a, S: StateStore>(
883        &mut self,
884        keys: impl Iterator<Item = &'a [u8]>,
885        table: &StateTableInner<S, SD>,
886        conflict_behavior: ConflictBehavior,
887        metrics: &MaterializeMetrics,
888    ) -> StreamExecutorResult<()> {
889        let mut futures = vec![];
890        for key in keys {
891            metrics.materialize_cache_total_count.inc();
892
893            if self.lru_cache.contains(key) {
894                if self.lru_cache.get(key).unwrap().is_some() {
895                    metrics.materialize_data_exist_count.inc();
896                }
897                metrics.materialize_cache_hit_count.inc();
898                continue;
899            }
900            futures.push(async {
901                let key_row = table.pk_serde().deserialize(key).unwrap();
902                let row = table.get_row(key_row).await?.map(CompactedRow::from);
903                StreamExecutorResult::Ok((key.to_vec(), row))
904            });
905        }
906
907        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
908        while let Some(result) = buffered.next().await {
909            let (key, row) = result?;
910            if row.is_some() {
911                metrics.materialize_data_exist_count.inc();
912            }
913            // for keys that are not in the table, `value` is None
914            match conflict_behavior {
915                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
916                _ => unreachable!(),
917            };
918        }
919
920        Ok(())
921    }
922
923    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
924        self.lru_cache.get(key).unwrap_or_else(|| {
925            panic!(
926                "the key {:?} has not been fetched in the materialize executor's cache ",
927                key
928            )
929        })
930    }
931
932    fn evict(&mut self) {
933        self.lru_cache.evict()
934    }
935}
936
937/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
938/// column value in the replacement row is not null.
939///
940/// # Example
941///
942/// ```ignore
943/// let mut row = vec![Some(1), None, Some(3)];
944/// let replacement = vec![Some(10), Some(20), None];
945/// replace_if_not_null(&mut row, replacement);
946/// ```
947///
948/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
949fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
950    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
951        if let Some(new_value) = new_col {
952            *old_col = Some(new_value);
953        }
954    }
955}
956
957/// Compare multiple version columns lexicographically.
958/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
959fn versions_are_newer_or_equal(
960    old_row: &OwnedRow,
961    new_row: &OwnedRow,
962    version_column_indices: &[u32],
963) -> bool {
964    if version_column_indices.is_empty() {
965        // No version columns specified, always consider new version as newer
966        return true;
967    }
968
969    for &idx in version_column_indices {
970        let old_value = old_row.index(idx as usize);
971        let new_value = new_row.index(idx as usize);
972
973        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
974            std::cmp::Ordering::Less => return true,     // new is newer
975            std::cmp::Ordering::Greater => return false, // old is newer
976            std::cmp::Ordering::Equal => continue,       // equal, check next column
977        }
978    }
979
980    // All version columns are equal, consider new version as equal (should overwrite)
981    true
982}
983
984#[cfg(test)]
985mod tests {
986
987    use std::iter;
988    use std::sync::atomic::AtomicU64;
989
990    use rand::rngs::SmallRng;
991    use rand::{Rng, RngCore, SeedableRng};
992    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
993    use risingwave_common::catalog::Field;
994    use risingwave_common::util::epoch::test_epoch;
995    use risingwave_common::util::sort_util::OrderType;
996    use risingwave_hummock_sdk::HummockReadEpoch;
997    use risingwave_storage::memory::MemoryStateStore;
998    use risingwave_storage::table::batch_table::BatchTable;
999
1000    use super::*;
1001    use crate::executor::test_utils::*;
1002
1003    #[tokio::test]
1004    async fn test_materialize_executor() {
1005        // Prepare storage and memtable.
1006        let memory_state_store = MemoryStateStore::new();
1007        let table_id = TableId::new(1);
1008        // Two columns of int32 type, the first column is PK.
1009        let schema = Schema::new(vec![
1010            Field::unnamed(DataType::Int32),
1011            Field::unnamed(DataType::Int32),
1012        ]);
1013        let column_ids = vec![0.into(), 1.into()];
1014
1015        // Prepare source chunks.
1016        let chunk1 = StreamChunk::from_pretty(
1017            " i i
1018            + 1 4
1019            + 2 5
1020            + 3 6",
1021        );
1022        let chunk2 = StreamChunk::from_pretty(
1023            " i i
1024            + 7 8
1025            - 3 6",
1026        );
1027
1028        // Prepare stream executors.
1029        let source = MockSource::with_messages(vec![
1030            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1031            Message::Chunk(chunk1),
1032            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1033            Message::Chunk(chunk2),
1034            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1035        ])
1036        .into_executor(schema.clone(), PkIndices::new());
1037
1038        let order_types = vec![OrderType::ascending()];
1039        let column_descs = vec![
1040            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1041            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1042        ];
1043
1044        let table = BatchTable::for_test(
1045            memory_state_store.clone(),
1046            table_id,
1047            column_descs,
1048            order_types,
1049            vec![0],
1050            vec![0, 1],
1051        );
1052
1053        let mut materialize_executor = MaterializeExecutor::for_test(
1054            source,
1055            memory_state_store,
1056            table_id,
1057            vec![ColumnOrder::new(0, OrderType::ascending())],
1058            column_ids,
1059            Arc::new(AtomicU64::new(0)),
1060            ConflictBehavior::NoCheck,
1061        )
1062        .await
1063        .boxed()
1064        .execute();
1065        materialize_executor.next().await.transpose().unwrap();
1066
1067        materialize_executor.next().await.transpose().unwrap();
1068
1069        // First stream chunk. We check the existence of (3) -> (3,6)
1070        match materialize_executor.next().await.transpose().unwrap() {
1071            Some(Message::Barrier(_)) => {
1072                let row = table
1073                    .get_row(
1074                        &OwnedRow::new(vec![Some(3_i32.into())]),
1075                        HummockReadEpoch::NoWait(u64::MAX),
1076                    )
1077                    .await
1078                    .unwrap();
1079                assert_eq!(
1080                    row,
1081                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1082                );
1083            }
1084            _ => unreachable!(),
1085        }
1086        materialize_executor.next().await.transpose().unwrap();
1087        // Second stream chunk. We check the existence of (7) -> (7,8)
1088        match materialize_executor.next().await.transpose().unwrap() {
1089            Some(Message::Barrier(_)) => {
1090                let row = table
1091                    .get_row(
1092                        &OwnedRow::new(vec![Some(7_i32.into())]),
1093                        HummockReadEpoch::NoWait(u64::MAX),
1094                    )
1095                    .await
1096                    .unwrap();
1097                assert_eq!(
1098                    row,
1099                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1100                );
1101            }
1102            _ => unreachable!(),
1103        }
1104    }
1105
1106    // https://github.com/risingwavelabs/risingwave/issues/13346
1107    #[tokio::test]
1108    async fn test_upsert_stream() {
1109        // Prepare storage and memtable.
1110        let memory_state_store = MemoryStateStore::new();
1111        let table_id = TableId::new(1);
1112        // Two columns of int32 type, the first column is PK.
1113        let schema = Schema::new(vec![
1114            Field::unnamed(DataType::Int32),
1115            Field::unnamed(DataType::Int32),
1116        ]);
1117        let column_ids = vec![0.into(), 1.into()];
1118
1119        // test double insert one pk, the latter needs to override the former.
1120        let chunk1 = StreamChunk::from_pretty(
1121            " i i
1122            + 1 1",
1123        );
1124
1125        let chunk2 = StreamChunk::from_pretty(
1126            " i i
1127            + 1 2
1128            - 1 2",
1129        );
1130
1131        // Prepare stream executors.
1132        let source = MockSource::with_messages(vec![
1133            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1134            Message::Chunk(chunk1),
1135            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1136            Message::Chunk(chunk2),
1137            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1138        ])
1139        .into_executor(schema.clone(), PkIndices::new());
1140
1141        let order_types = vec![OrderType::ascending()];
1142        let column_descs = vec![
1143            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1144            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1145        ];
1146
1147        let table = BatchTable::for_test(
1148            memory_state_store.clone(),
1149            table_id,
1150            column_descs,
1151            order_types,
1152            vec![0],
1153            vec![0, 1],
1154        );
1155
1156        let mut materialize_executor = MaterializeExecutor::for_test(
1157            source,
1158            memory_state_store,
1159            table_id,
1160            vec![ColumnOrder::new(0, OrderType::ascending())],
1161            column_ids,
1162            Arc::new(AtomicU64::new(0)),
1163            ConflictBehavior::Overwrite,
1164        )
1165        .await
1166        .boxed()
1167        .execute();
1168        materialize_executor.next().await.transpose().unwrap();
1169
1170        materialize_executor.next().await.transpose().unwrap();
1171        materialize_executor.next().await.transpose().unwrap();
1172        materialize_executor.next().await.transpose().unwrap();
1173
1174        match materialize_executor.next().await.transpose().unwrap() {
1175            Some(Message::Barrier(_)) => {
1176                let row = table
1177                    .get_row(
1178                        &OwnedRow::new(vec![Some(1_i32.into())]),
1179                        HummockReadEpoch::NoWait(u64::MAX),
1180                    )
1181                    .await
1182                    .unwrap();
1183                assert!(row.is_none());
1184            }
1185            _ => unreachable!(),
1186        }
1187    }
1188
1189    #[tokio::test]
1190    async fn test_check_insert_conflict() {
1191        // Prepare storage and memtable.
1192        let memory_state_store = MemoryStateStore::new();
1193        let table_id = TableId::new(1);
1194        // Two columns of int32 type, the first column is PK.
1195        let schema = Schema::new(vec![
1196            Field::unnamed(DataType::Int32),
1197            Field::unnamed(DataType::Int32),
1198        ]);
1199        let column_ids = vec![0.into(), 1.into()];
1200
1201        // test double insert one pk, the latter needs to override the former.
1202        let chunk1 = StreamChunk::from_pretty(
1203            " i i
1204            + 1 3
1205            + 1 4
1206            + 2 5
1207            + 3 6",
1208        );
1209
1210        let chunk2 = StreamChunk::from_pretty(
1211            " i i
1212            + 1 3
1213            + 2 6",
1214        );
1215
1216        // test delete wrong value, delete inexistent pk
1217        let chunk3 = StreamChunk::from_pretty(
1218            " i i
1219            + 1 4",
1220        );
1221
1222        // Prepare stream executors.
1223        let source = MockSource::with_messages(vec![
1224            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1225            Message::Chunk(chunk1),
1226            Message::Chunk(chunk2),
1227            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1228            Message::Chunk(chunk3),
1229            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1230        ])
1231        .into_executor(schema.clone(), PkIndices::new());
1232
1233        let order_types = vec![OrderType::ascending()];
1234        let column_descs = vec![
1235            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1236            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1237        ];
1238
1239        let table = BatchTable::for_test(
1240            memory_state_store.clone(),
1241            table_id,
1242            column_descs,
1243            order_types,
1244            vec![0],
1245            vec![0, 1],
1246        );
1247
1248        let mut materialize_executor = MaterializeExecutor::for_test(
1249            source,
1250            memory_state_store,
1251            table_id,
1252            vec![ColumnOrder::new(0, OrderType::ascending())],
1253            column_ids,
1254            Arc::new(AtomicU64::new(0)),
1255            ConflictBehavior::Overwrite,
1256        )
1257        .await
1258        .boxed()
1259        .execute();
1260        materialize_executor.next().await.transpose().unwrap();
1261
1262        materialize_executor.next().await.transpose().unwrap();
1263        materialize_executor.next().await.transpose().unwrap();
1264
1265        // First stream chunk. We check the existence of (3) -> (3,6)
1266        match materialize_executor.next().await.transpose().unwrap() {
1267            Some(Message::Barrier(_)) => {
1268                let row = table
1269                    .get_row(
1270                        &OwnedRow::new(vec![Some(3_i32.into())]),
1271                        HummockReadEpoch::NoWait(u64::MAX),
1272                    )
1273                    .await
1274                    .unwrap();
1275                assert_eq!(
1276                    row,
1277                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1278                );
1279
1280                let row = table
1281                    .get_row(
1282                        &OwnedRow::new(vec![Some(1_i32.into())]),
1283                        HummockReadEpoch::NoWait(u64::MAX),
1284                    )
1285                    .await
1286                    .unwrap();
1287                assert_eq!(
1288                    row,
1289                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1290                );
1291
1292                let row = table
1293                    .get_row(
1294                        &OwnedRow::new(vec![Some(2_i32.into())]),
1295                        HummockReadEpoch::NoWait(u64::MAX),
1296                    )
1297                    .await
1298                    .unwrap();
1299                assert_eq!(
1300                    row,
1301                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1302                );
1303            }
1304            _ => unreachable!(),
1305        }
1306    }
1307
1308    #[tokio::test]
1309    async fn test_delete_and_update_conflict() {
1310        // Prepare storage and memtable.
1311        let memory_state_store = MemoryStateStore::new();
1312        let table_id = TableId::new(1);
1313        // Two columns of int32 type, the first column is PK.
1314        let schema = Schema::new(vec![
1315            Field::unnamed(DataType::Int32),
1316            Field::unnamed(DataType::Int32),
1317        ]);
1318        let column_ids = vec![0.into(), 1.into()];
1319
1320        // test double insert one pk, the latter needs to override the former.
1321        let chunk1 = StreamChunk::from_pretty(
1322            " i i
1323            + 1 4
1324            + 2 5
1325            + 3 6
1326            U- 8 1
1327            U+ 8 2
1328            + 8 3",
1329        );
1330
1331        // test delete wrong value, delete inexistent pk
1332        let chunk2 = StreamChunk::from_pretty(
1333            " i i
1334            + 7 8
1335            - 3 4
1336            - 5 0",
1337        );
1338
1339        // test delete wrong value, delete inexistent pk
1340        let chunk3 = StreamChunk::from_pretty(
1341            " i i
1342            + 1 5
1343            U- 2 4
1344            U+ 2 8
1345            U- 9 0
1346            U+ 9 1",
1347        );
1348
1349        // Prepare stream executors.
1350        let source = MockSource::with_messages(vec![
1351            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1352            Message::Chunk(chunk1),
1353            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1354            Message::Chunk(chunk2),
1355            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1356            Message::Chunk(chunk3),
1357            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1358        ])
1359        .into_executor(schema.clone(), PkIndices::new());
1360
1361        let order_types = vec![OrderType::ascending()];
1362        let column_descs = vec![
1363            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1364            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1365        ];
1366
1367        let table = BatchTable::for_test(
1368            memory_state_store.clone(),
1369            table_id,
1370            column_descs,
1371            order_types,
1372            vec![0],
1373            vec![0, 1],
1374        );
1375
1376        let mut materialize_executor = MaterializeExecutor::for_test(
1377            source,
1378            memory_state_store,
1379            table_id,
1380            vec![ColumnOrder::new(0, OrderType::ascending())],
1381            column_ids,
1382            Arc::new(AtomicU64::new(0)),
1383            ConflictBehavior::Overwrite,
1384        )
1385        .await
1386        .boxed()
1387        .execute();
1388        materialize_executor.next().await.transpose().unwrap();
1389
1390        materialize_executor.next().await.transpose().unwrap();
1391
1392        // First stream chunk. We check the existence of (3) -> (3,6)
1393        match materialize_executor.next().await.transpose().unwrap() {
1394            Some(Message::Barrier(_)) => {
1395                // can read (8, 3), check insert after update
1396                let row = table
1397                    .get_row(
1398                        &OwnedRow::new(vec![Some(8_i32.into())]),
1399                        HummockReadEpoch::NoWait(u64::MAX),
1400                    )
1401                    .await
1402                    .unwrap();
1403                assert_eq!(
1404                    row,
1405                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1406                );
1407            }
1408            _ => unreachable!(),
1409        }
1410        materialize_executor.next().await.transpose().unwrap();
1411
1412        match materialize_executor.next().await.transpose().unwrap() {
1413            Some(Message::Barrier(_)) => {
1414                let row = table
1415                    .get_row(
1416                        &OwnedRow::new(vec![Some(7_i32.into())]),
1417                        HummockReadEpoch::NoWait(u64::MAX),
1418                    )
1419                    .await
1420                    .unwrap();
1421                assert_eq!(
1422                    row,
1423                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1424                );
1425
1426                // check delete wrong value
1427                let row = table
1428                    .get_row(
1429                        &OwnedRow::new(vec![Some(3_i32.into())]),
1430                        HummockReadEpoch::NoWait(u64::MAX),
1431                    )
1432                    .await
1433                    .unwrap();
1434                assert_eq!(row, None);
1435
1436                // check delete wrong pk
1437                let row = table
1438                    .get_row(
1439                        &OwnedRow::new(vec![Some(5_i32.into())]),
1440                        HummockReadEpoch::NoWait(u64::MAX),
1441                    )
1442                    .await
1443                    .unwrap();
1444                assert_eq!(row, None);
1445            }
1446            _ => unreachable!(),
1447        }
1448
1449        materialize_executor.next().await.transpose().unwrap();
1450        // Second stream chunk. We check the existence of (7) -> (7,8)
1451        match materialize_executor.next().await.transpose().unwrap() {
1452            Some(Message::Barrier(_)) => {
1453                let row = table
1454                    .get_row(
1455                        &OwnedRow::new(vec![Some(1_i32.into())]),
1456                        HummockReadEpoch::NoWait(u64::MAX),
1457                    )
1458                    .await
1459                    .unwrap();
1460                assert_eq!(
1461                    row,
1462                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1463                );
1464
1465                // check update wrong value
1466                let row = table
1467                    .get_row(
1468                        &OwnedRow::new(vec![Some(2_i32.into())]),
1469                        HummockReadEpoch::NoWait(u64::MAX),
1470                    )
1471                    .await
1472                    .unwrap();
1473                assert_eq!(
1474                    row,
1475                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1476                );
1477
1478                // check update wrong pk, should become insert
1479                let row = table
1480                    .get_row(
1481                        &OwnedRow::new(vec![Some(9_i32.into())]),
1482                        HummockReadEpoch::NoWait(u64::MAX),
1483                    )
1484                    .await
1485                    .unwrap();
1486                assert_eq!(
1487                    row,
1488                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1489                );
1490            }
1491            _ => unreachable!(),
1492        }
1493    }
1494
1495    #[tokio::test]
1496    async fn test_ignore_insert_conflict() {
1497        // Prepare storage and memtable.
1498        let memory_state_store = MemoryStateStore::new();
1499        let table_id = TableId::new(1);
1500        // Two columns of int32 type, the first column is PK.
1501        let schema = Schema::new(vec![
1502            Field::unnamed(DataType::Int32),
1503            Field::unnamed(DataType::Int32),
1504        ]);
1505        let column_ids = vec![0.into(), 1.into()];
1506
1507        // test double insert one pk, the latter needs to be ignored.
1508        let chunk1 = StreamChunk::from_pretty(
1509            " i i
1510            + 1 3
1511            + 1 4
1512            + 2 5
1513            + 3 6",
1514        );
1515
1516        let chunk2 = StreamChunk::from_pretty(
1517            " i i
1518            + 1 5
1519            + 2 6",
1520        );
1521
1522        // test delete wrong value, delete inexistent pk
1523        let chunk3 = StreamChunk::from_pretty(
1524            " i i
1525            + 1 6",
1526        );
1527
1528        // Prepare stream executors.
1529        let source = MockSource::with_messages(vec![
1530            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1531            Message::Chunk(chunk1),
1532            Message::Chunk(chunk2),
1533            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1534            Message::Chunk(chunk3),
1535            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1536        ])
1537        .into_executor(schema.clone(), PkIndices::new());
1538
1539        let order_types = vec![OrderType::ascending()];
1540        let column_descs = vec![
1541            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1542            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1543        ];
1544
1545        let table = BatchTable::for_test(
1546            memory_state_store.clone(),
1547            table_id,
1548            column_descs,
1549            order_types,
1550            vec![0],
1551            vec![0, 1],
1552        );
1553
1554        let mut materialize_executor = MaterializeExecutor::for_test(
1555            source,
1556            memory_state_store,
1557            table_id,
1558            vec![ColumnOrder::new(0, OrderType::ascending())],
1559            column_ids,
1560            Arc::new(AtomicU64::new(0)),
1561            ConflictBehavior::IgnoreConflict,
1562        )
1563        .await
1564        .boxed()
1565        .execute();
1566        materialize_executor.next().await.transpose().unwrap();
1567
1568        materialize_executor.next().await.transpose().unwrap();
1569        materialize_executor.next().await.transpose().unwrap();
1570
1571        // First stream chunk. We check the existence of (3) -> (3,6)
1572        match materialize_executor.next().await.transpose().unwrap() {
1573            Some(Message::Barrier(_)) => {
1574                let row = table
1575                    .get_row(
1576                        &OwnedRow::new(vec![Some(3_i32.into())]),
1577                        HummockReadEpoch::NoWait(u64::MAX),
1578                    )
1579                    .await
1580                    .unwrap();
1581                assert_eq!(
1582                    row,
1583                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1584                );
1585
1586                let row = table
1587                    .get_row(
1588                        &OwnedRow::new(vec![Some(1_i32.into())]),
1589                        HummockReadEpoch::NoWait(u64::MAX),
1590                    )
1591                    .await
1592                    .unwrap();
1593                assert_eq!(
1594                    row,
1595                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1596                );
1597
1598                let row = table
1599                    .get_row(
1600                        &OwnedRow::new(vec![Some(2_i32.into())]),
1601                        HummockReadEpoch::NoWait(u64::MAX),
1602                    )
1603                    .await
1604                    .unwrap();
1605                assert_eq!(
1606                    row,
1607                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1608                );
1609            }
1610            _ => unreachable!(),
1611        }
1612    }
1613
1614    #[tokio::test]
1615    async fn test_ignore_delete_then_insert() {
1616        // Prepare storage and memtable.
1617        let memory_state_store = MemoryStateStore::new();
1618        let table_id = TableId::new(1);
1619        // Two columns of int32 type, the first column is PK.
1620        let schema = Schema::new(vec![
1621            Field::unnamed(DataType::Int32),
1622            Field::unnamed(DataType::Int32),
1623        ]);
1624        let column_ids = vec![0.into(), 1.into()];
1625
1626        // test insert after delete one pk, the latter insert should succeed.
1627        let chunk1 = StreamChunk::from_pretty(
1628            " i i
1629            + 1 3
1630            - 1 3
1631            + 1 6",
1632        );
1633
1634        // Prepare stream executors.
1635        let source = MockSource::with_messages(vec![
1636            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1637            Message::Chunk(chunk1),
1638            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1639        ])
1640        .into_executor(schema.clone(), PkIndices::new());
1641
1642        let order_types = vec![OrderType::ascending()];
1643        let column_descs = vec![
1644            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1645            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1646        ];
1647
1648        let table = BatchTable::for_test(
1649            memory_state_store.clone(),
1650            table_id,
1651            column_descs,
1652            order_types,
1653            vec![0],
1654            vec![0, 1],
1655        );
1656
1657        let mut materialize_executor = MaterializeExecutor::for_test(
1658            source,
1659            memory_state_store,
1660            table_id,
1661            vec![ColumnOrder::new(0, OrderType::ascending())],
1662            column_ids,
1663            Arc::new(AtomicU64::new(0)),
1664            ConflictBehavior::IgnoreConflict,
1665        )
1666        .await
1667        .boxed()
1668        .execute();
1669        let _msg1 = materialize_executor
1670            .next()
1671            .await
1672            .transpose()
1673            .unwrap()
1674            .unwrap()
1675            .as_barrier()
1676            .unwrap();
1677        let _msg2 = materialize_executor
1678            .next()
1679            .await
1680            .transpose()
1681            .unwrap()
1682            .unwrap()
1683            .as_chunk()
1684            .unwrap();
1685        let _msg3 = materialize_executor
1686            .next()
1687            .await
1688            .transpose()
1689            .unwrap()
1690            .unwrap()
1691            .as_barrier()
1692            .unwrap();
1693
1694        let row = table
1695            .get_row(
1696                &OwnedRow::new(vec![Some(1_i32.into())]),
1697                HummockReadEpoch::NoWait(u64::MAX),
1698            )
1699            .await
1700            .unwrap();
1701        assert_eq!(
1702            row,
1703            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1704        );
1705    }
1706
1707    #[tokio::test]
1708    async fn test_ignore_delete_and_update_conflict() {
1709        // Prepare storage and memtable.
1710        let memory_state_store = MemoryStateStore::new();
1711        let table_id = TableId::new(1);
1712        // Two columns of int32 type, the first column is PK.
1713        let schema = Schema::new(vec![
1714            Field::unnamed(DataType::Int32),
1715            Field::unnamed(DataType::Int32),
1716        ]);
1717        let column_ids = vec![0.into(), 1.into()];
1718
1719        // test double insert one pk, the latter should be ignored.
1720        let chunk1 = StreamChunk::from_pretty(
1721            " i i
1722            + 1 4
1723            + 2 5
1724            + 3 6
1725            U- 8 1
1726            U+ 8 2
1727            + 8 3",
1728        );
1729
1730        // test delete wrong value, delete inexistent pk
1731        let chunk2 = StreamChunk::from_pretty(
1732            " i i
1733            + 7 8
1734            - 3 4
1735            - 5 0",
1736        );
1737
1738        // test delete wrong value, delete inexistent pk
1739        let chunk3 = StreamChunk::from_pretty(
1740            " i i
1741            + 1 5
1742            U- 2 4
1743            U+ 2 8
1744            U- 9 0
1745            U+ 9 1",
1746        );
1747
1748        // Prepare stream executors.
1749        let source = MockSource::with_messages(vec![
1750            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1751            Message::Chunk(chunk1),
1752            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1753            Message::Chunk(chunk2),
1754            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1755            Message::Chunk(chunk3),
1756            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1757        ])
1758        .into_executor(schema.clone(), PkIndices::new());
1759
1760        let order_types = vec![OrderType::ascending()];
1761        let column_descs = vec![
1762            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1763            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1764        ];
1765
1766        let table = BatchTable::for_test(
1767            memory_state_store.clone(),
1768            table_id,
1769            column_descs,
1770            order_types,
1771            vec![0],
1772            vec![0, 1],
1773        );
1774
1775        let mut materialize_executor = MaterializeExecutor::for_test(
1776            source,
1777            memory_state_store,
1778            table_id,
1779            vec![ColumnOrder::new(0, OrderType::ascending())],
1780            column_ids,
1781            Arc::new(AtomicU64::new(0)),
1782            ConflictBehavior::IgnoreConflict,
1783        )
1784        .await
1785        .boxed()
1786        .execute();
1787        materialize_executor.next().await.transpose().unwrap();
1788
1789        materialize_executor.next().await.transpose().unwrap();
1790
1791        // First stream chunk. We check the existence of (3) -> (3,6)
1792        match materialize_executor.next().await.transpose().unwrap() {
1793            Some(Message::Barrier(_)) => {
1794                // can read (8, 2), check insert after update
1795                let row = table
1796                    .get_row(
1797                        &OwnedRow::new(vec![Some(8_i32.into())]),
1798                        HummockReadEpoch::NoWait(u64::MAX),
1799                    )
1800                    .await
1801                    .unwrap();
1802                assert_eq!(
1803                    row,
1804                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1805                );
1806            }
1807            _ => unreachable!(),
1808        }
1809        materialize_executor.next().await.transpose().unwrap();
1810
1811        match materialize_executor.next().await.transpose().unwrap() {
1812            Some(Message::Barrier(_)) => {
1813                let row = table
1814                    .get_row(
1815                        &OwnedRow::new(vec![Some(7_i32.into())]),
1816                        HummockReadEpoch::NoWait(u64::MAX),
1817                    )
1818                    .await
1819                    .unwrap();
1820                assert_eq!(
1821                    row,
1822                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1823                );
1824
1825                // check delete wrong value
1826                let row = table
1827                    .get_row(
1828                        &OwnedRow::new(vec![Some(3_i32.into())]),
1829                        HummockReadEpoch::NoWait(u64::MAX),
1830                    )
1831                    .await
1832                    .unwrap();
1833                assert_eq!(row, None);
1834
1835                // check delete wrong pk
1836                let row = table
1837                    .get_row(
1838                        &OwnedRow::new(vec![Some(5_i32.into())]),
1839                        HummockReadEpoch::NoWait(u64::MAX),
1840                    )
1841                    .await
1842                    .unwrap();
1843                assert_eq!(row, None);
1844            }
1845            _ => unreachable!(),
1846        }
1847
1848        materialize_executor.next().await.transpose().unwrap();
1849        // materialize_executor.next().await.transpose().unwrap();
1850        // Second stream chunk. We check the existence of (7) -> (7,8)
1851        match materialize_executor.next().await.transpose().unwrap() {
1852            Some(Message::Barrier(_)) => {
1853                let row = table
1854                    .get_row(
1855                        &OwnedRow::new(vec![Some(1_i32.into())]),
1856                        HummockReadEpoch::NoWait(u64::MAX),
1857                    )
1858                    .await
1859                    .unwrap();
1860                assert_eq!(
1861                    row,
1862                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1863                );
1864
1865                // check update wrong value
1866                let row = table
1867                    .get_row(
1868                        &OwnedRow::new(vec![Some(2_i32.into())]),
1869                        HummockReadEpoch::NoWait(u64::MAX),
1870                    )
1871                    .await
1872                    .unwrap();
1873                assert_eq!(
1874                    row,
1875                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1876                );
1877
1878                // check update wrong pk, should become insert
1879                let row = table
1880                    .get_row(
1881                        &OwnedRow::new(vec![Some(9_i32.into())]),
1882                        HummockReadEpoch::NoWait(u64::MAX),
1883                    )
1884                    .await
1885                    .unwrap();
1886                assert_eq!(
1887                    row,
1888                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1889                );
1890            }
1891            _ => unreachable!(),
1892        }
1893    }
1894
1895    #[tokio::test]
1896    async fn test_do_update_if_not_null_conflict() {
1897        // Prepare storage and memtable.
1898        let memory_state_store = MemoryStateStore::new();
1899        let table_id = TableId::new(1);
1900        // Two columns of int32 type, the first column is PK.
1901        let schema = Schema::new(vec![
1902            Field::unnamed(DataType::Int32),
1903            Field::unnamed(DataType::Int32),
1904        ]);
1905        let column_ids = vec![0.into(), 1.into()];
1906
1907        // should get (8, 2)
1908        let chunk1 = StreamChunk::from_pretty(
1909            " i i
1910            + 1 4
1911            + 2 .
1912            + 3 6
1913            U- 8 .
1914            U+ 8 2
1915            + 8 .",
1916        );
1917
1918        // should not get (3, x), should not get (5, 0)
1919        let chunk2 = StreamChunk::from_pretty(
1920            " i i
1921            + 7 8
1922            - 3 4
1923            - 5 0",
1924        );
1925
1926        // should get (2, None), (7, 8)
1927        let chunk3 = StreamChunk::from_pretty(
1928            " i i
1929            + 1 5
1930            + 7 .
1931            U- 2 4
1932            U+ 2 .
1933            U- 9 0
1934            U+ 9 1",
1935        );
1936
1937        // Prepare stream executors.
1938        let source = MockSource::with_messages(vec![
1939            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1940            Message::Chunk(chunk1),
1941            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1942            Message::Chunk(chunk2),
1943            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1944            Message::Chunk(chunk3),
1945            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1946        ])
1947        .into_executor(schema.clone(), PkIndices::new());
1948
1949        let order_types = vec![OrderType::ascending()];
1950        let column_descs = vec![
1951            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1952            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1953        ];
1954
1955        let table = BatchTable::for_test(
1956            memory_state_store.clone(),
1957            table_id,
1958            column_descs,
1959            order_types,
1960            vec![0],
1961            vec![0, 1],
1962        );
1963
1964        let mut materialize_executor = MaterializeExecutor::for_test(
1965            source,
1966            memory_state_store,
1967            table_id,
1968            vec![ColumnOrder::new(0, OrderType::ascending())],
1969            column_ids,
1970            Arc::new(AtomicU64::new(0)),
1971            ConflictBehavior::DoUpdateIfNotNull,
1972        )
1973        .await
1974        .boxed()
1975        .execute();
1976        materialize_executor.next().await.transpose().unwrap();
1977
1978        materialize_executor.next().await.transpose().unwrap();
1979
1980        // First stream chunk. We check the existence of (3) -> (3,6)
1981        match materialize_executor.next().await.transpose().unwrap() {
1982            Some(Message::Barrier(_)) => {
1983                let row = table
1984                    .get_row(
1985                        &OwnedRow::new(vec![Some(8_i32.into())]),
1986                        HummockReadEpoch::NoWait(u64::MAX),
1987                    )
1988                    .await
1989                    .unwrap();
1990                assert_eq!(
1991                    row,
1992                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1993                );
1994
1995                let row = table
1996                    .get_row(
1997                        &OwnedRow::new(vec![Some(2_i32.into())]),
1998                        HummockReadEpoch::NoWait(u64::MAX),
1999                    )
2000                    .await
2001                    .unwrap();
2002                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2003            }
2004            _ => unreachable!(),
2005        }
2006        materialize_executor.next().await.transpose().unwrap();
2007
2008        match materialize_executor.next().await.transpose().unwrap() {
2009            Some(Message::Barrier(_)) => {
2010                let row = table
2011                    .get_row(
2012                        &OwnedRow::new(vec![Some(7_i32.into())]),
2013                        HummockReadEpoch::NoWait(u64::MAX),
2014                    )
2015                    .await
2016                    .unwrap();
2017                assert_eq!(
2018                    row,
2019                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2020                );
2021
2022                // check delete wrong value
2023                let row = table
2024                    .get_row(
2025                        &OwnedRow::new(vec![Some(3_i32.into())]),
2026                        HummockReadEpoch::NoWait(u64::MAX),
2027                    )
2028                    .await
2029                    .unwrap();
2030                assert_eq!(row, None);
2031
2032                // check delete wrong pk
2033                let row = table
2034                    .get_row(
2035                        &OwnedRow::new(vec![Some(5_i32.into())]),
2036                        HummockReadEpoch::NoWait(u64::MAX),
2037                    )
2038                    .await
2039                    .unwrap();
2040                assert_eq!(row, None);
2041            }
2042            _ => unreachable!(),
2043        }
2044
2045        materialize_executor.next().await.transpose().unwrap();
2046        // materialize_executor.next().await.transpose().unwrap();
2047        // Second stream chunk. We check the existence of (7) -> (7,8)
2048        match materialize_executor.next().await.transpose().unwrap() {
2049            Some(Message::Barrier(_)) => {
2050                let row = table
2051                    .get_row(
2052                        &OwnedRow::new(vec![Some(7_i32.into())]),
2053                        HummockReadEpoch::NoWait(u64::MAX),
2054                    )
2055                    .await
2056                    .unwrap();
2057                assert_eq!(
2058                    row,
2059                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2060                );
2061
2062                // check update wrong value
2063                let row = table
2064                    .get_row(
2065                        &OwnedRow::new(vec![Some(2_i32.into())]),
2066                        HummockReadEpoch::NoWait(u64::MAX),
2067                    )
2068                    .await
2069                    .unwrap();
2070                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2071
2072                // check update wrong pk, should become insert
2073                let row = table
2074                    .get_row(
2075                        &OwnedRow::new(vec![Some(9_i32.into())]),
2076                        HummockReadEpoch::NoWait(u64::MAX),
2077                    )
2078                    .await
2079                    .unwrap();
2080                assert_eq!(
2081                    row,
2082                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2083                );
2084            }
2085            _ => unreachable!(),
2086        }
2087    }
2088
2089    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2090        const KN: u32 = 4;
2091        const SEED: u64 = 998244353;
2092        let mut ret = vec![];
2093        let mut builder =
2094            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2095        let mut rng = SmallRng::seed_from_u64(SEED);
2096
2097        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2098            let len = c.data_chunk().capacity();
2099            let mut c = StreamChunkMut::from(c);
2100            for i in 0..len {
2101                c.set_vis(i, rng.random_bool(0.5));
2102            }
2103            c.into()
2104        };
2105        for _ in 0..row_number {
2106            let k = (rng.next_u32() % KN) as i32;
2107            let v = rng.next_u32() as i32;
2108            let op = if rng.random_bool(0.5) {
2109                Op::Insert
2110            } else {
2111                Op::Delete
2112            };
2113            if let Some(c) =
2114                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2115            {
2116                ret.push(random_vis(c, &mut rng));
2117            }
2118        }
2119        if let Some(c) = builder.take() {
2120            ret.push(random_vis(c, &mut rng));
2121        }
2122        ret
2123    }
2124
2125    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2126        const N: usize = 100000;
2127
2128        // Prepare storage and memtable.
2129        let memory_state_store = MemoryStateStore::new();
2130        let table_id = TableId::new(1);
2131        // Two columns of int32 type, the first column is PK.
2132        let schema = Schema::new(vec![
2133            Field::unnamed(DataType::Int32),
2134            Field::unnamed(DataType::Int32),
2135        ]);
2136        let column_ids = vec![0.into(), 1.into()];
2137
2138        let chunks = gen_fuzz_data(N, 128);
2139        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2140            .chain(chunks.into_iter().map(Message::Chunk))
2141            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2142                test_epoch(2),
2143            ))))
2144            .collect();
2145        // Prepare stream executors.
2146        let source =
2147            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2148
2149        let mut materialize_executor = MaterializeExecutor::for_test(
2150            source,
2151            memory_state_store.clone(),
2152            table_id,
2153            vec![ColumnOrder::new(0, OrderType::ascending())],
2154            column_ids,
2155            Arc::new(AtomicU64::new(0)),
2156            conflict_behavior,
2157        )
2158        .await
2159        .boxed()
2160        .execute();
2161        materialize_executor.expect_barrier().await;
2162
2163        let order_types = vec![OrderType::ascending()];
2164        let column_descs = vec![
2165            ColumnDesc::unnamed(0.into(), DataType::Int32),
2166            ColumnDesc::unnamed(1.into(), DataType::Int32),
2167        ];
2168        let pk_indices = vec![0];
2169
2170        let mut table = StateTable::from_table_catalog(
2171            &crate::common::table::test_utils::gen_pbtable(
2172                TableId::from(1002),
2173                column_descs.clone(),
2174                order_types,
2175                pk_indices,
2176                0,
2177            ),
2178            memory_state_store.clone(),
2179            None,
2180        )
2181        .await;
2182
2183        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2184            // check with state table's memtable
2185            table.write_chunk(c);
2186        }
2187    }
2188
2189    #[tokio::test]
2190    async fn fuzz_test_stream_consistent_upsert() {
2191        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2192    }
2193
2194    #[tokio::test]
2195    async fn fuzz_test_stream_consistent_ignore() {
2196        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2197    }
2198}