risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::ops::{Bound, Deref, Index};
19
20use bytes::Bytes;
21use futures::future::Either;
22use futures::stream::{self, select_with_strategy};
23use futures_async_stream::try_stream;
24use itertools::Itertools;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{
28    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
29};
30use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
31use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
32use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
33use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
35use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::catalog::Table;
38use risingwave_pb::catalog::table::Engine;
39use risingwave_pb::id::SourceId;
40use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
41use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
42use risingwave_storage::table::KeyedRow;
43
44use crate::cache::ManagedLruCache;
45use crate::common::change_buffer::output_kind as cb_kind;
46use crate::common::metrics::MetricsInfo;
47use crate::common::table::state_table::{
48    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
49};
50use crate::executor::error::ErrorKind;
51use crate::executor::monitor::MaterializeMetrics;
52use crate::executor::mview::RefreshProgressTable;
53use crate::executor::prelude::*;
54use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
55use crate::task::LocalBarrierManager;
56
57#[derive(Debug, Clone)]
58pub enum MaterializeStreamState<M> {
59    NormalIngestion,
60    MergingData,
61    CleanUp,
62    CommitAndYieldBarrier {
63        barrier: BarrierInner<M>,
64        expect_next_state: Box<MaterializeStreamState<M>>,
65    },
66    RefreshEnd {
67        on_complete_epoch: EpochPair,
68    },
69}
70
71/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
72pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
73    input: Executor,
74
75    schema: Schema,
76
77    state_table: StateTableInner<S, SD>,
78
79    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
80    arrange_key_indices: Vec<usize>,
81
82    actor_context: ActorContextRef,
83
84    materialize_cache: MaterializeCache<SD>,
85
86    conflict_behavior: ConflictBehavior,
87
88    version_column_indices: Vec<u32>,
89
90    may_have_downstream: bool,
91
92    subscriber_ids: HashSet<u32>,
93
94    metrics: MaterializeMetrics,
95
96    /// No data will be written to hummock table. This Materialize is just a dummy node.
97    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
98    is_dummy_table: bool,
99
100    /// Indices of TOAST-able columns for PostgreSQL CDC tables. None means either non-CDC table or CDC table without TOAST-able columns.
101    toastable_column_indices: Option<Vec<usize>>,
102
103    /// Optional refresh arguments and state for refreshable materialized views
104    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
105
106    /// Local barrier manager for reporting barrier events
107    local_barrier_manager: LocalBarrierManager,
108}
109
110/// Arguments and state for refreshable materialized views
111pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
112    /// Table catalog for main table
113    pub table_catalog: Table,
114
115    /// Table catalog for staging table
116    pub staging_table_catalog: Table,
117
118    /// Flag indicating if this table is currently being refreshed
119    pub is_refreshing: bool,
120
121    /// During data refresh (between `RefreshStart` and `LoadFinish`),
122    /// data will be written to both the main table and the staging table.
123    ///
124    /// The staging table is PK-only.
125    ///
126    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
127    pub staging_table: StateTableInner<S, SD>,
128
129    /// Progress table for tracking refresh state per `VNode` for fault tolerance
130    pub progress_table: RefreshProgressTable<S>,
131
132    /// Table ID for this refreshable materialized view
133    pub table_id: TableId,
134}
135
136impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
137    /// Create new `RefreshableMaterializeArgs`
138    pub async fn new(
139        store: S,
140        table_catalog: &Table,
141        staging_table_catalog: &Table,
142        progress_state_table: &Table,
143        vnodes: Option<Arc<Bitmap>>,
144    ) -> Self {
145        let table_id = table_catalog.id;
146
147        // staging table is pk-only, and we don't need to check value consistency
148        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
149            staging_table_catalog,
150            store.clone(),
151            vnodes.clone(),
152        )
153        .await;
154
155        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
156            progress_state_table,
157            store,
158            vnodes,
159        )
160        .await;
161
162        // Get primary key length from main table catalog
163        let pk_len = table_catalog.pk.len();
164        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
165
166        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
167
168        Self {
169            table_catalog: table_catalog.clone(),
170            staging_table_catalog: staging_table_catalog.clone(),
171            is_refreshing: false,
172            staging_table,
173            progress_table,
174            table_id,
175        }
176    }
177}
178
179fn get_op_consistency_level(
180    conflict_behavior: ConflictBehavior,
181    may_have_downstream: bool,
182    subscriber_ids: &HashSet<u32>,
183) -> StateTableOpConsistencyLevel {
184    if !subscriber_ids.is_empty() {
185        StateTableOpConsistencyLevel::LogStoreEnabled
186    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
187        // Table with overwrite conflict behavior could disable conflict check
188        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
189        StateTableOpConsistencyLevel::Inconsistent
190    } else {
191        StateTableOpConsistencyLevel::ConsistentOldValue
192    }
193}
194
195impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
196    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
197    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
198    /// should be `None`.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        input: Executor,
202        schema: Schema,
203        store: S,
204        arrange_key: Vec<ColumnOrder>,
205        actor_context: ActorContextRef,
206        vnodes: Option<Arc<Bitmap>>,
207        table_catalog: &Table,
208        watermark_epoch: AtomicU64Ref,
209        conflict_behavior: ConflictBehavior,
210        version_column_indices: Vec<u32>,
211        metrics: Arc<StreamingMetrics>,
212        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
213        local_barrier_manager: LocalBarrierManager,
214    ) -> Self {
215        let table_columns: Vec<ColumnDesc> = table_catalog
216            .columns
217            .iter()
218            .map(|col| col.column_desc.as_ref().unwrap().into())
219            .collect();
220
221        // Extract TOAST-able column indices from table columns.
222        // Only for PostgreSQL CDC tables.
223        let toastable_column_indices = if table_catalog.cdc_table_type()
224            == risingwave_pb::catalog::table::CdcTableType::Postgres
225        {
226            let toastable_indices: Vec<usize> = table_columns
227                .iter()
228                .enumerate()
229                .filter_map(|(index, column)| match &column.data_type {
230                    // Currently supports TOAST updates for:
231                    // - jsonb (DataType::Jsonb)
232                    // - varchar (DataType::Varchar)
233                    // - bytea (DataType::Bytea)
234                    // - One-dimensional arrays of the above types (DataType::List)
235                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
236
237                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
238                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
239                        Some(index)
240                    }
241                    _ => None,
242                })
243                .collect();
244
245            if toastable_indices.is_empty() {
246                None
247            } else {
248                Some(toastable_indices)
249            }
250        } else {
251            None
252        };
253
254        let row_serde: BasicSerde = BasicSerde::new(
255            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
256            Arc::from(table_columns.into_boxed_slice()),
257        );
258
259        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
260        let may_have_downstream = actor_context.initial_dispatch_num != 0;
261        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
262        let op_consistency_level =
263            get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
264        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
265        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
266            .with_op_consistency_level(op_consistency_level)
267            .enable_preload_all_rows_by_config(&actor_context.config)
268            .build()
269            .await;
270
271        let mv_metrics = metrics.new_materialize_metrics(
272            table_catalog.id,
273            actor_context.id,
274            actor_context.fragment_id,
275        );
276
277        let metrics_info =
278            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
279
280        let is_dummy_table =
281            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
282
283        Self {
284            input,
285            schema,
286            state_table,
287            arrange_key_indices,
288            actor_context,
289            materialize_cache: MaterializeCache::new(
290                watermark_epoch,
291                metrics_info,
292                row_serde,
293                version_column_indices.clone(),
294            ),
295            conflict_behavior,
296            version_column_indices,
297            is_dummy_table,
298            may_have_downstream,
299            subscriber_ids,
300            metrics: mv_metrics,
301            toastable_column_indices,
302            refresh_args,
303            local_barrier_manager,
304        }
305    }
306
307    #[try_stream(ok = Message, error = StreamExecutorError)]
308    async fn execute_inner(mut self) {
309        let mv_table_id = self.state_table.table_id();
310        let data_types = self.schema.data_types();
311        let mut input = self.input.execute();
312
313        let barrier = expect_first_barrier(&mut input).await?;
314        let first_epoch = barrier.epoch;
315        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
316        // The first barrier message should be propagated.
317        yield Message::Barrier(barrier);
318        self.state_table.init_epoch(first_epoch).await?;
319
320        // default to normal ingestion
321        let mut inner_state =
322            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
323        // Initialize staging table for refreshable materialized views
324        if let Some(ref mut refresh_args) = self.refresh_args {
325            refresh_args.staging_table.init_epoch(first_epoch).await?;
326
327            // Initialize progress table and load existing progress for recovery
328            refresh_args.progress_table.recover(first_epoch).await?;
329
330            // Check if refresh is already in progress (recovery scenario)
331            let progress_stats = refresh_args.progress_table.get_progress_stats();
332            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
333                refresh_args.is_refreshing = true;
334                tracing::info!(
335                    total_vnodes = progress_stats.total_vnodes,
336                    completed_vnodes = progress_stats.completed_vnodes,
337                    "Recovered refresh in progress, resuming refresh operation"
338                );
339
340                // Since stage info is no longer stored in progress table,
341                // we need to determine recovery state differently.
342                // For now, assume all incomplete VNodes need to continue merging
343                let incomplete_vnodes: Vec<_> = refresh_args
344                    .progress_table
345                    .get_all_progress()
346                    .iter()
347                    .filter(|(_, entry)| !entry.is_completed)
348                    .map(|(&vnode, _)| vnode)
349                    .collect();
350
351                if !incomplete_vnodes.is_empty() {
352                    // Some VNodes are incomplete, need to resume refresh operation
353                    tracing::info!(
354                        incomplete_vnodes = incomplete_vnodes.len(),
355                        "Recovery detected incomplete VNodes, resuming refresh operation"
356                    );
357                    // Since stage tracking is now in memory, we'll determine the appropriate
358                    // stage based on the executor's internal state machine
359                } else {
360                    // This should not happen if is_complete() returned false, but handle it gracefully
361                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
362                }
363            }
364        }
365
366        // Determine initial execution stage (for recovery scenarios)
367        if let Some(ref refresh_args) = self.refresh_args
368            && refresh_args.is_refreshing
369        {
370            // Recovery logic: Check if there are incomplete vnodes from previous run
371            let incomplete_vnodes: Vec<_> = refresh_args
372                .progress_table
373                .get_all_progress()
374                .iter()
375                .filter(|(_, entry)| !entry.is_completed)
376                .map(|(&vnode, _)| vnode)
377                .collect();
378            if !incomplete_vnodes.is_empty() {
379                // Resume from merge stage since some VNodes were left incomplete
380                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
381                tracing::info!(
382                    incomplete_vnodes = incomplete_vnodes.len(),
383                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
384                );
385            }
386        }
387
388        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
389        'main_loop: loop {
390            match *inner_state {
391                MaterializeStreamState::NormalIngestion => {
392                    #[for_await]
393                    '_normal_ingest: for msg in input.by_ref() {
394                        let msg = msg?;
395                        self.materialize_cache.evict();
396
397                        match msg {
398                            Message::Watermark(w) => {
399                                yield Message::Watermark(w);
400                            }
401                            Message::Chunk(chunk) if self.is_dummy_table => {
402                                self.metrics
403                                    .materialize_input_row_count
404                                    .inc_by(chunk.cardinality() as u64);
405                                yield Message::Chunk(chunk);
406                            }
407                            Message::Chunk(chunk) => {
408                                self.metrics
409                                    .materialize_input_row_count
410                                    .inc_by(chunk.cardinality() as u64);
411
412                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
413                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
414                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
415                                // while outputting inconsistent changes to downstream which no one will subscribe to.
416                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
417                                    self.conflict_behavior
418                                    && !self.state_table.is_consistent_op()
419                                    && self.version_column_indices.is_empty()
420                                {
421                                    ConflictBehavior::NoCheck
422                                } else {
423                                    self.conflict_behavior
424                                };
425
426                                match optimized_conflict_behavior {
427                                    checked_conflict_behaviors!() => {
428                                        if chunk.cardinality() == 0 {
429                                            // empty chunk
430                                            continue;
431                                        }
432                                        let (data_chunk, ops) = chunk.clone().into_parts();
433
434                                        if self.state_table.value_indices().is_some() {
435                                            // TODO(st1page): when materialize partial columns(), we should
436                                            // construct some columns in the pk
437                                            panic!(
438                                                "materialize executor with data check can not handle only materialize partial columns"
439                                            )
440                                        };
441                                        let values = data_chunk.serialize();
442
443                                        let key_chunk =
444                                            data_chunk.project(self.state_table.pk_indices());
445
446                                        // For refreshable materialized views, write to staging table during refresh
447                                        // Do not use generate_output here.
448                                        if let Some(ref mut refresh_args) = self.refresh_args
449                                            && refresh_args.is_refreshing
450                                        {
451                                            let key_chunk = chunk
452                                                .clone()
453                                                .project(self.state_table.pk_indices());
454                                            tracing::trace!(
455                                                staging_chunk = %key_chunk.to_pretty(),
456                                                input_chunk = %chunk.to_pretty(),
457                                                "writing to staging table"
458                                            );
459                                            if cfg!(debug_assertions) {
460                                                // refreshable source should be append-only
461                                                assert!(
462                                                    key_chunk
463                                                        .ops()
464                                                        .iter()
465                                                        .all(|op| op == &Op::Insert)
466                                                );
467                                            }
468                                            refresh_args
469                                                .staging_table
470                                                .write_chunk(key_chunk.clone());
471                                            refresh_args.staging_table.try_flush().await?;
472                                        }
473
474                                        let pks = {
475                                            let mut pks = vec![vec![]; data_chunk.capacity()];
476                                            key_chunk
477                                                .rows_with_holes()
478                                                .zip_eq_fast(pks.iter_mut())
479                                                .for_each(|(r, vnode_and_pk)| {
480                                                    if let Some(r) = r {
481                                                        self.state_table
482                                                            .pk_serde()
483                                                            .serialize(r, vnode_and_pk);
484                                                    }
485                                                });
486                                            pks
487                                        };
488                                        let (_, vis) = key_chunk.into_parts();
489                                        let row_ops = ops
490                                            .iter()
491                                            .zip_eq_debug(pks.into_iter())
492                                            .zip_eq_debug(values.into_iter())
493                                            .zip_eq_debug(vis.iter())
494                                            .filter_map(|(((op, k), v), vis)| {
495                                                vis.then_some((*op, k, v))
496                                            })
497                                            .collect_vec();
498
499                                        let change_buffer = self
500                                            .materialize_cache
501                                            .handle(
502                                                row_ops,
503                                                &self.state_table,
504                                                self.conflict_behavior,
505                                                &self.metrics,
506                                                self.toastable_column_indices.as_deref(),
507                                            )
508                                            .await?;
509
510                                        match change_buffer
511                                            .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
512                                        {
513                                            Some(output_chunk) => {
514                                                self.state_table.write_chunk(output_chunk.clone());
515                                                self.state_table.try_flush().await?;
516                                                yield Message::Chunk(output_chunk);
517                                            }
518                                            None => continue,
519                                        }
520                                    }
521                                    ConflictBehavior::NoCheck => {
522                                        self.state_table.write_chunk(chunk.clone());
523                                        self.state_table.try_flush().await?;
524
525                                        // For refreshable materialized views, also write to staging table during refresh
526                                        if let Some(ref mut refresh_args) = self.refresh_args
527                                            && refresh_args.is_refreshing
528                                        {
529                                            let key_chunk = chunk
530                                                .clone()
531                                                .project(self.state_table.pk_indices());
532                                            tracing::trace!(
533                                                staging_chunk = %key_chunk.to_pretty(),
534                                                input_chunk = %chunk.to_pretty(),
535                                                "writing to staging table"
536                                            );
537                                            if cfg!(debug_assertions) {
538                                                // refreshable source should be append-only
539                                                assert!(
540                                                    key_chunk
541                                                        .ops()
542                                                        .iter()
543                                                        .all(|op| op == &Op::Insert)
544                                                );
545                                            }
546                                            refresh_args
547                                                .staging_table
548                                                .write_chunk(key_chunk.clone());
549                                            refresh_args.staging_table.try_flush().await?;
550                                        }
551
552                                        yield Message::Chunk(chunk);
553                                    }
554                                }
555                            }
556                            Message::Barrier(barrier) => {
557                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
558                                    barrier,
559                                    expect_next_state: Box::new(
560                                        MaterializeStreamState::NormalIngestion,
561                                    ),
562                                };
563                                continue 'main_loop;
564                            }
565                        }
566                    }
567
568                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
569                        anyhow::anyhow!(
570                            "Input stream terminated unexpectedly during normal ingestion"
571                        ),
572                    )));
573                }
574                MaterializeStreamState::MergingData => {
575                    let Some(refresh_args) = self.refresh_args.as_mut() else {
576                        panic!(
577                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
578                        );
579                    };
580                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
581
582                    debug_assert_eq!(
583                        self.state_table.vnodes(),
584                        refresh_args.staging_table.vnodes()
585                    );
586                    debug_assert_eq!(
587                        refresh_args.staging_table.vnodes(),
588                        refresh_args.progress_table.vnodes()
589                    );
590
591                    let mut rows_to_delete = vec![];
592                    let mut merge_complete = false;
593                    let mut pending_barrier: Option<Barrier> = None;
594
595                    // Scope to limit immutable borrows to state tables
596                    {
597                        let left_input = input.by_ref().map(Either::Left);
598                        let right_merge_sort = pin!(
599                            Self::make_mergesort_stream(
600                                &self.state_table,
601                                &refresh_args.staging_table,
602                                &mut refresh_args.progress_table
603                            )
604                            .map(Either::Right)
605                        );
606
607                        // Prefer to select input stream to handle barriers promptly
608                        // Rebuild the merge stream each time processing a barrier
609                        let mut merge_stream =
610                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
611                                stream::PollNext::Left
612                            });
613
614                        #[for_await]
615                        'merge_stream: for either in &mut merge_stream {
616                            match either {
617                                Either::Left(msg) => {
618                                    let msg = msg?;
619                                    match msg {
620                                        Message::Watermark(w) => yield Message::Watermark(w),
621                                        Message::Chunk(chunk) => {
622                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
623                                        }
624                                        Message::Barrier(b) => {
625                                            pending_barrier = Some(b);
626                                            break 'merge_stream;
627                                        }
628                                    }
629                                }
630                                Either::Right(result) => {
631                                    match result? {
632                                        Some((_vnode, row)) => {
633                                            rows_to_delete.push(row);
634                                        }
635                                        None => {
636                                            // Merge stream finished
637                                            merge_complete = true;
638
639                                            // If the merge stream finished, we need to wait for the next barrier to commit states
640                                        }
641                                    }
642                                }
643                            }
644                        }
645                    }
646
647                    // Process collected rows for deletion
648                    for row in &rows_to_delete {
649                        self.state_table.delete(row);
650                    }
651                    if !rows_to_delete.is_empty() {
652                        let to_delete_chunk = StreamChunk::from_rows(
653                            &rows_to_delete
654                                .iter()
655                                .map(|row| (Op::Delete, row))
656                                .collect_vec(),
657                            &self.schema.data_types(),
658                        );
659
660                        yield Message::Chunk(to_delete_chunk);
661                    }
662
663                    // should wait for at least one barrier
664                    assert!(pending_barrier.is_some(), "pending barrier is not set");
665
666                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
667                        barrier: pending_barrier.unwrap(),
668                        expect_next_state: if merge_complete {
669                            Box::new(MaterializeStreamState::CleanUp)
670                        } else {
671                            Box::new(MaterializeStreamState::MergingData)
672                        },
673                    };
674                    continue 'main_loop;
675                }
676                MaterializeStreamState::CleanUp => {
677                    let Some(refresh_args) = self.refresh_args.as_mut() else {
678                        panic!(
679                            "MaterializeExecutor entered MergingData state without refresh_args configured"
680                        );
681                    };
682                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
683
684                    #[for_await]
685                    for msg in input.by_ref() {
686                        let msg = msg?;
687                        match msg {
688                            Message::Watermark(w) => yield Message::Watermark(w),
689                            Message::Chunk(chunk) => {
690                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
691                            }
692                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
693                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694                                    barrier,
695                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
696                                };
697                                continue 'main_loop;
698                            }
699                            Message::Barrier(barrier) => {
700                                let staging_table_id = refresh_args.staging_table.table_id();
701                                let epoch = barrier.epoch;
702                                self.local_barrier_manager.report_refresh_finished(
703                                    epoch,
704                                    self.actor_context.id,
705                                    refresh_args.table_id,
706                                    staging_table_id,
707                                );
708                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
709
710                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
711                                    barrier,
712                                    expect_next_state: Box::new(
713                                        MaterializeStreamState::RefreshEnd {
714                                            on_complete_epoch: epoch,
715                                        },
716                                    ),
717                                };
718                                continue 'main_loop;
719                            }
720                        }
721                    }
722                }
723                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
724                    let Some(refresh_args) = self.refresh_args.as_mut() else {
725                        panic!(
726                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
727                        );
728                    };
729                    let staging_table_id = refresh_args.staging_table.table_id();
730
731                    // Wait for staging table truncation to complete
732                    let staging_store = refresh_args.staging_table.state_store().clone();
733                    staging_store
734                        .try_wait_epoch(
735                            HummockReadEpoch::Committed(on_complete_epoch.prev),
736                            TryWaitEpochOptions {
737                                table_id: staging_table_id,
738                            },
739                        )
740                        .await?;
741
742                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
743
744                    if let Some(ref mut refresh_args) = self.refresh_args {
745                        refresh_args.is_refreshing = false;
746                    }
747                    *inner_state = MaterializeStreamState::NormalIngestion;
748                    continue 'main_loop;
749                }
750                MaterializeStreamState::CommitAndYieldBarrier {
751                    barrier,
752                    mut expect_next_state,
753                } => {
754                    if let Some(ref mut refresh_args) = self.refresh_args {
755                        match barrier.mutation.as_deref() {
756                            Some(Mutation::RefreshStart {
757                                table_id: refresh_table_id,
758                                associated_source_id: _,
759                            }) if *refresh_table_id == refresh_args.table_id => {
760                                debug_assert!(
761                                    !refresh_args.is_refreshing,
762                                    "cannot start refresh twice"
763                                );
764                                refresh_args.is_refreshing = true;
765                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
766
767                                // Initialize progress tracking for all VNodes
768                                Self::init_refresh_progress(
769                                    &self.state_table,
770                                    &mut refresh_args.progress_table,
771                                    barrier.epoch.curr,
772                                )?;
773                            }
774                            Some(Mutation::LoadFinish {
775                                associated_source_id: load_finish_source_id,
776                            }) => {
777                                // Get associated source id from table catalog
778                                let associated_source_id: SourceId = match refresh_args
779                                    .table_catalog
780                                    .optional_associated_source_id
781                                {
782                                    Some(id) => id.into(),
783                                    None => unreachable!("associated_source_id is not set"),
784                                };
785
786                                if *load_finish_source_id == associated_source_id {
787                                    tracing::info!(
788                                        %load_finish_source_id,
789                                        "LoadFinish received, starting data replacement"
790                                    );
791                                    expect_next_state =
792                                        Box::new(MaterializeStreamState::<_>::MergingData);
793                                }
794                            }
795                            _ => {}
796                        }
797                    }
798
799                    // ===== normal operation =====
800
801                    // If a downstream mv depends on the current table, we need to do conflict check again.
802                    if !self.may_have_downstream
803                        && barrier.has_more_downstream_fragments(self.actor_context.id)
804                    {
805                        self.may_have_downstream = true;
806                    }
807                    Self::may_update_depended_subscriptions(
808                        &mut self.subscriber_ids,
809                        &barrier,
810                        mv_table_id,
811                    );
812                    let op_consistency_level = get_op_consistency_level(
813                        self.conflict_behavior,
814                        self.may_have_downstream,
815                        &self.subscriber_ids,
816                    );
817                    let post_commit = self
818                        .state_table
819                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
820                        .await?;
821                    if !post_commit.inner().is_consistent_op() {
822                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
823                    }
824
825                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
826
827                    // Commit staging table for refreshable materialized views
828                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
829                    {
830                        // Commit progress table for fault tolerance
831
832                        Some((
833                            refresh_args.staging_table.commit(barrier.epoch).await?,
834                            refresh_args.progress_table.commit(barrier.epoch).await?,
835                        ))
836                    } else {
837                        None
838                    };
839
840                    let b_epoch = barrier.epoch;
841                    yield Message::Barrier(barrier);
842
843                    // Update the vnode bitmap for the state table if asked.
844                    if let Some((_, cache_may_stale)) = post_commit
845                        .post_yield_barrier(update_vnode_bitmap.clone())
846                        .await?
847                        && cache_may_stale
848                    {
849                        self.materialize_cache.lru_cache.clear();
850                    }
851
852                    // Handle staging table post commit
853                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
854                        staging_post_commit
855                            .post_yield_barrier(update_vnode_bitmap.clone())
856                            .await?;
857                        progress_post_commit
858                            .post_yield_barrier(update_vnode_bitmap)
859                            .await?;
860                    }
861
862                    self.metrics
863                        .materialize_current_epoch
864                        .set(b_epoch.curr as i64);
865
866                    // ====== transition to next state ======
867
868                    *inner_state = *expect_next_state;
869                }
870            }
871        }
872    }
873
874    /// Stream that yields rows to be deleted from main table.
875    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
876    /// Yields `None` when finished processing all vnodes.
877    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
878    async fn make_mergesort_stream<'a>(
879        main_table: &'a StateTableInner<S, SD>,
880        staging_table: &'a StateTableInner<S, SD>,
881        progress_table: &'a mut RefreshProgressTable<S>,
882    ) {
883        for vnode in main_table.vnodes().clone().iter_vnodes() {
884            let mut processed_rows = 0;
885            // Check if this VNode has already been completed (for fault tolerance)
886            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
887                if let Some(current_entry) = progress_table.get_progress(vnode) {
888                    // Skip already completed VNodes during recovery
889                    if current_entry.is_completed {
890                        tracing::debug!(
891                            vnode = vnode.to_index(),
892                            "Skipping already completed VNode during recovery"
893                        );
894                        continue;
895                    }
896                    processed_rows += current_entry.processed_rows;
897                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
898
899                    if let Some(current_state) = &current_entry.current_pos {
900                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
901                    } else {
902                        (Bound::Unbounded, Bound::Unbounded)
903                    }
904                } else {
905                    (Bound::Unbounded, Bound::Unbounded)
906                };
907
908            let iter_main = main_table
909                .iter_keyed_row_with_vnode(
910                    vnode,
911                    &pk_range,
912                    PrefetchOptions::prefetch_for_large_range_scan(),
913                )
914                .await?;
915            let iter_staging = staging_table
916                .iter_keyed_row_with_vnode(
917                    vnode,
918                    &pk_range,
919                    PrefetchOptions::prefetch_for_large_range_scan(),
920                )
921                .await?;
922
923            pin_mut!(iter_main);
924            pin_mut!(iter_staging);
925
926            // Sort-merge join implementation using dual pointers
927            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
928            let mut staging_item: Option<KeyedRow<Bytes>> =
929                iter_staging.next().await.transpose()?;
930
931            while let Some(main_kv) = main_item {
932                let main_key = main_kv.key();
933
934                // Advance staging iterator until we find a key >= main_key
935                let mut should_delete = false;
936                while let Some(staging_kv) = &staging_item {
937                    let staging_key = staging_kv.key();
938                    match main_key.cmp(staging_key) {
939                        std::cmp::Ordering::Greater => {
940                            // main_key > staging_key, advance staging
941                            staging_item = iter_staging.next().await.transpose()?;
942                        }
943                        std::cmp::Ordering::Equal => {
944                            // Keys match, this row exists in both tables, no need to delete
945                            break;
946                        }
947                        std::cmp::Ordering::Less => {
948                            // main_key < staging_key, main row doesn't exist in staging, delete it
949                            should_delete = true;
950                            break;
951                        }
952                    }
953                }
954
955                // If staging_item is None, all remaining main rows should be deleted
956                if staging_item.is_none() {
957                    should_delete = true;
958                }
959
960                if should_delete {
961                    yield Some((vnode, main_kv.row().clone()));
962                }
963
964                // Advance main iterator
965                processed_rows += 1;
966                tracing::info!(
967                    "set progress table: vnode = {:?}, processed_rows = {:?}",
968                    vnode,
969                    processed_rows
970                );
971                progress_table.set_progress(
972                    vnode,
973                    Some(
974                        main_kv
975                            .row()
976                            .project(main_table.pk_indices())
977                            .to_owned_row(),
978                    ),
979                    false,
980                    processed_rows,
981                )?;
982                main_item = iter_main.next().await.transpose()?;
983            }
984
985            // Mark this VNode as completed
986            if let Some(current_entry) = progress_table.get_progress(vnode) {
987                progress_table.set_progress(
988                    vnode,
989                    current_entry.current_pos.clone(),
990                    true, // completed
991                    current_entry.processed_rows,
992                )?;
993
994                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
995            }
996        }
997
998        // Signal completion
999        yield None;
1000    }
1001
1002    /// return true when changed
1003    fn may_update_depended_subscriptions(
1004        depended_subscriptions: &mut HashSet<u32>,
1005        barrier: &Barrier,
1006        mv_table_id: TableId,
1007    ) {
1008        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1009            if !depended_subscriptions.insert(subscriber_id) {
1010                warn!(
1011                    ?depended_subscriptions,
1012                    ?mv_table_id,
1013                    subscriber_id,
1014                    "subscription id already exists"
1015                );
1016            }
1017        }
1018
1019        if let Some(Mutation::DropSubscriptions {
1020            subscriptions_to_drop,
1021        }) = barrier.mutation.as_deref()
1022        {
1023            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1024                if *upstream_mv_table_id == mv_table_id
1025                    && !depended_subscriptions.remove(subscriber_id)
1026                {
1027                    warn!(
1028                        ?depended_subscriptions,
1029                        ?mv_table_id,
1030                        subscriber_id,
1031                        "drop non existing subscriber_id id"
1032                    );
1033                }
1034            }
1035        }
1036    }
1037
1038    /// Initialize refresh progress tracking for all `VNodes`
1039    fn init_refresh_progress(
1040        state_table: &StateTableInner<S, SD>,
1041        progress_table: &mut RefreshProgressTable<S>,
1042        _epoch: u64,
1043    ) -> StreamExecutorResult<()> {
1044        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1045
1046        // Initialize progress for all VNodes in the current bitmap
1047        for vnode in state_table.vnodes().iter_vnodes() {
1048            progress_table.set_progress(
1049                vnode, None,  // initial position
1050                false, // not completed yet
1051                0,     // initial processed rows
1052            )?;
1053        }
1054
1055        tracing::info!(
1056            vnodes_count = state_table.vnodes().count_ones(),
1057            "Initialized refresh progress tracking for all VNodes"
1058        );
1059
1060        Ok(())
1061    }
1062}
1063
1064impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1065    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1066    #[cfg(any(test, feature = "test"))]
1067    pub async fn for_test(
1068        input: Executor,
1069        store: S,
1070        table_id: TableId,
1071        keys: Vec<ColumnOrder>,
1072        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1073        watermark_epoch: AtomicU64Ref,
1074        conflict_behavior: ConflictBehavior,
1075    ) -> Self {
1076        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1077        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1078        let schema = input.schema().clone();
1079        let columns: Vec<ColumnDesc> = column_ids
1080            .into_iter()
1081            .zip_eq_fast(schema.fields.iter())
1082            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1083            .collect_vec();
1084
1085        let row_serde = BasicSerde::new(
1086            Arc::from((0..columns.len()).collect_vec()),
1087            Arc::from(columns.clone().into_boxed_slice()),
1088        );
1089        let state_table = StateTableInner::from_table_catalog(
1090            &crate::common::table::test_utils::gen_pbtable(
1091                table_id,
1092                columns,
1093                arrange_order_types,
1094                arrange_columns.clone(),
1095                0,
1096            ),
1097            store,
1098            None,
1099        )
1100        .await;
1101
1102        let metrics =
1103            StreamingMetrics::unused().new_materialize_metrics(table_id, 1.into(), 2.into());
1104
1105        Self {
1106            input,
1107            schema,
1108            state_table,
1109            arrange_key_indices: arrange_columns.clone(),
1110            actor_context: ActorContext::for_test(0),
1111            materialize_cache: MaterializeCache::new(
1112                watermark_epoch,
1113                MetricsInfo::for_test(),
1114                row_serde,
1115                vec![],
1116            ),
1117            conflict_behavior,
1118            version_column_indices: vec![],
1119            is_dummy_table: false,
1120            toastable_column_indices: None,
1121            may_have_downstream: true,
1122            subscriber_ids: HashSet::new(),
1123            metrics,
1124            refresh_args: None, // Test constructor doesn't support refresh functionality
1125            local_barrier_manager: LocalBarrierManager::for_test(),
1126        }
1127    }
1128}
1129
1130/// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
1131/// Optimized by checking length first to avoid expensive string comparison.
1132fn is_unavailable_value_str(s: &str) -> bool {
1133    s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1134}
1135
1136/// Check if a datum represents Debezium's unavailable value placeholder.
1137/// This function handles both scalar types and one-dimensional arrays.
1138fn is_debezium_unavailable_value(
1139    datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1140) -> bool {
1141    match datum {
1142        Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1143        Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1144            // For jsonb type, check if it's a string containing the unavailable value
1145            jsonb_ref
1146                .as_str()
1147                .map(is_unavailable_value_str)
1148                .unwrap_or(false)
1149        }
1150        Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1151            // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
1152            // This is because when processing bytea from Debezium, we convert the base64-encoded string
1153            // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
1154            if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1155                is_unavailable_value_str(bytea_str)
1156            } else {
1157                false
1158            }
1159        }
1160        Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1161            // For list type, check if it contains exactly one element with the unavailable value
1162            // This is because when any element in an array triggers TOAST, Debezium treats the entire
1163            // array as unchanged and sends a placeholder array with only one element
1164            if list_ref.len() == 1 {
1165                if let Some(Some(element)) = list_ref.get(0) {
1166                    // Recursively check the array element
1167                    is_debezium_unavailable_value(&Some(element))
1168                } else {
1169                    false
1170                }
1171            } else {
1172                false
1173            }
1174        }
1175        _ => false,
1176    }
1177}
1178
1179/// Fix TOAST columns by replacing unavailable values with old row values.
1180fn handle_toast_columns_for_postgres_cdc(
1181    old_row: &OwnedRow,
1182    new_row: &OwnedRow,
1183    toastable_indices: &[usize],
1184) -> OwnedRow {
1185    let mut fixed_row_data = new_row.as_inner().to_vec();
1186
1187    for &toast_idx in toastable_indices {
1188        // Check if the new value is Debezium's unavailable value placeholder
1189        let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1190        if is_unavailable {
1191            // Replace with old row value if available
1192            if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1193                fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1194            }
1195        }
1196    }
1197
1198    OwnedRow::new(fixed_row_data)
1199}
1200
1201type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
1202
1203impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1204    fn execute(self: Box<Self>) -> BoxedMessageStream {
1205        self.execute_inner().boxed()
1206    }
1207}
1208
1209impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211        f.debug_struct("MaterializeExecutor")
1212            .field("arrange_key_indices", &self.arrange_key_indices)
1213            .finish()
1214    }
1215}
1216
1217/// A cache for materialize executors.
1218struct MaterializeCache<SD> {
1219    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1220    row_serde: BasicSerde,
1221    version_column_indices: Vec<u32>,
1222    _serde: PhantomData<SD>,
1223}
1224
1225type CacheValue = Option<CompactedRow>;
1226
1227impl<SD: ValueRowSerde> MaterializeCache<SD> {
1228    fn new(
1229        watermark_sequence: AtomicU64Ref,
1230        metrics_info: MetricsInfo,
1231        row_serde: BasicSerde,
1232        version_column_indices: Vec<u32>,
1233    ) -> Self {
1234        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1235            ManagedLruCache::unbounded(watermark_sequence, metrics_info);
1236        Self {
1237            lru_cache,
1238            row_serde,
1239            version_column_indices,
1240            _serde: PhantomData,
1241        }
1242    }
1243
1244    /// First populate the cache from `table`, and then calculate a [`ChangeBuffer`].
1245    /// `table` will not be written in this method.
1246    async fn handle<S: StateStore>(
1247        &mut self,
1248        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1249        table: &StateTableInner<S, SD>,
1250        conflict_behavior: ConflictBehavior,
1251        metrics: &MaterializeMetrics,
1252        toastable_column_indices: Option<&[usize]>,
1253    ) -> StreamExecutorResult<ChangeBuffer> {
1254        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1255
1256        let key_set: HashSet<Box<[u8]>> = row_ops
1257            .iter()
1258            .map(|(_, k, _)| k.as_slice().into())
1259            .collect();
1260
1261        // Populate the LRU cache with the keys in input chunk.
1262        // For new keys, row values are set to None.
1263        self.fetch_keys(
1264            key_set.iter().map(|v| v.deref()),
1265            table,
1266            conflict_behavior,
1267            metrics,
1268        )
1269        .await?;
1270
1271        let mut change_buffer = ChangeBuffer::new();
1272        let row_serde = self.row_serde.clone();
1273        let version_column_indices = self.version_column_indices.clone();
1274        for (op, key, row) in row_ops {
1275            match op {
1276                Op::Insert | Op::UpdateInsert => {
1277                    let Some(old_row) = self.get_expected(&key) else {
1278                        // not exists before, meaning no conflict, simply insert
1279                        let new_row_deserialized =
1280                            row_serde.deserializer.deserialize(row.clone())?;
1281                        change_buffer.insert(key.clone(), new_row_deserialized);
1282                        self.lru_cache.put(key, Some(CompactedRow { row }));
1283                        continue;
1284                    };
1285
1286                    // now conflict happens, handle it according to the specified behavior
1287                    match conflict_behavior {
1288                        ConflictBehavior::Overwrite => {
1289                            let old_row_deserialized =
1290                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1291                            let new_row_deserialized =
1292                                row_serde.deserializer.deserialize(row.clone())?;
1293
1294                            let need_overwrite = if !version_column_indices.is_empty() {
1295                                versions_are_newer_or_equal(
1296                                    &old_row_deserialized,
1297                                    &new_row_deserialized,
1298                                    &version_column_indices,
1299                                )
1300                            } else {
1301                                // no version column specified, just overwrite
1302                                true
1303                            };
1304
1305                            if need_overwrite {
1306                                if let Some(toastable_indices) = toastable_column_indices {
1307                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
1308                                    let final_row = handle_toast_columns_for_postgres_cdc(
1309                                        &old_row_deserialized,
1310                                        &new_row_deserialized,
1311                                        toastable_indices,
1312                                    );
1313
1314                                    change_buffer.update(
1315                                        key.clone(),
1316                                        old_row_deserialized,
1317                                        final_row.clone(),
1318                                    );
1319                                    let final_row_bytes =
1320                                        Bytes::from(row_serde.serializer.serialize(final_row));
1321                                    self.lru_cache.put(
1322                                        key.clone(),
1323                                        Some(CompactedRow {
1324                                            row: final_row_bytes,
1325                                        }),
1326                                    );
1327                                } else {
1328                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
1329                                    change_buffer.update(
1330                                        key.clone(),
1331                                        old_row_deserialized,
1332                                        new_row_deserialized,
1333                                    );
1334                                    self.lru_cache
1335                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1336                                }
1337                            };
1338                        }
1339                        ConflictBehavior::IgnoreConflict => {
1340                            // ignore conflict, do nothing
1341                        }
1342                        ConflictBehavior::DoUpdateIfNotNull => {
1343                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
1344
1345                            let old_row_deserialized =
1346                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1347                            let new_row_deserialized =
1348                                row_serde.deserializer.deserialize(row.clone())?;
1349                            let need_overwrite = if !version_column_indices.is_empty() {
1350                                versions_are_newer_or_equal(
1351                                    &old_row_deserialized,
1352                                    &new_row_deserialized,
1353                                    &version_column_indices,
1354                                )
1355                            } else {
1356                                true
1357                            };
1358
1359                            if need_overwrite {
1360                                let mut row_deserialized_vec =
1361                                    old_row_deserialized.clone().into_inner().into_vec();
1362                                replace_if_not_null(
1363                                    &mut row_deserialized_vec,
1364                                    new_row_deserialized.clone(),
1365                                );
1366                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
1367
1368                                // Apply TOAST column fix for CDC tables with TOAST columns
1369                                if let Some(toastable_indices) = toastable_column_indices {
1370                                    // Note: we need to use old_row_deserialized again, but it was moved above
1371                                    // So we re-deserialize the old row
1372                                    let old_row_deserialized_again =
1373                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
1374                                    updated_row = handle_toast_columns_for_postgres_cdc(
1375                                        &old_row_deserialized_again,
1376                                        &updated_row,
1377                                        toastable_indices,
1378                                    );
1379                                }
1380
1381                                change_buffer.update(
1382                                    key.clone(),
1383                                    old_row_deserialized,
1384                                    updated_row.clone(),
1385                                );
1386                                let updated_row_bytes =
1387                                    Bytes::from(row_serde.serializer.serialize(updated_row));
1388                                self.lru_cache.put(
1389                                    key.clone(),
1390                                    Some(CompactedRow {
1391                                        row: updated_row_bytes,
1392                                    }),
1393                                );
1394                            }
1395                        }
1396                        _ => unreachable!(),
1397                    };
1398                }
1399
1400                Op::UpdateDelete
1401                    if matches!(
1402                        conflict_behavior,
1403                        ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
1404                    ) =>
1405                {
1406                    // For `UpdateDelete`s, we skip processing them but directly handle the following `UpdateInsert`
1407                    // instead. This is because...
1408                    //
1409                    // - For `Overwrite`, we only care about the new row.
1410                    // - For `DoUpdateIfNotNull`, we don't want the whole row to be deleted, but instead perform
1411                    //   column-wise replacement when handling the `UpdateInsert`.
1412                    //
1413                    // However, for `IgnoreConflict`, we still need to delete the old row first, otherwise the row
1414                    // cannot be updated at all.
1415                }
1416
1417                Op::Delete | Op::UpdateDelete => {
1418                    if let Some(old_row) = self.get_expected(&key) {
1419                        let old_row_deserialized =
1420                            row_serde.deserializer.deserialize(old_row.row.clone())?;
1421                        change_buffer.delete(key.clone(), old_row_deserialized);
1422                        // put a None into the cache to represent deletion
1423                        self.lru_cache.put(key, None);
1424                    } else {
1425                        // delete a non-existent value
1426                        // this is allowed in the case of mview conflict, so ignore
1427                    }
1428                }
1429            }
1430        }
1431        Ok(change_buffer)
1432    }
1433
1434    async fn fetch_keys<'a, S: StateStore>(
1435        &mut self,
1436        keys: impl Iterator<Item = &'a [u8]>,
1437        table: &StateTableInner<S, SD>,
1438        conflict_behavior: ConflictBehavior,
1439        metrics: &MaterializeMetrics,
1440    ) -> StreamExecutorResult<()> {
1441        let mut futures = vec![];
1442        for key in keys {
1443            metrics.materialize_cache_total_count.inc();
1444
1445            if self.lru_cache.contains(key) {
1446                if self.lru_cache.get(key).unwrap().is_some() {
1447                    metrics.materialize_data_exist_count.inc();
1448                }
1449                metrics.materialize_cache_hit_count.inc();
1450                continue;
1451            }
1452            futures.push(async {
1453                let key_row = table.pk_serde().deserialize(key).unwrap();
1454                let row = table.get_row(key_row).await?.map(CompactedRow::from);
1455                StreamExecutorResult::Ok((key.to_vec(), row))
1456            });
1457        }
1458
1459        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1460        while let Some(result) = buffered.next().await {
1461            let (key, row) = result?;
1462            if row.is_some() {
1463                metrics.materialize_data_exist_count.inc();
1464            }
1465            // for keys that are not in the table, `value` is None
1466            match conflict_behavior {
1467                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1468                _ => unreachable!(),
1469            };
1470        }
1471
1472        Ok(())
1473    }
1474
1475    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1476        self.lru_cache.get(key).unwrap_or_else(|| {
1477            panic!(
1478                "the key {:?} has not been fetched in the materialize executor's cache ",
1479                key
1480            )
1481        })
1482    }
1483
1484    fn evict(&mut self) {
1485        self.lru_cache.evict()
1486    }
1487}
1488
1489/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
1490/// column value in the replacement row is not null.
1491///
1492/// # Example
1493///
1494/// ```ignore
1495/// let mut row = vec![Some(1), None, Some(3)];
1496/// let replacement = vec![Some(10), Some(20), None];
1497/// replace_if_not_null(&mut row, replacement);
1498/// ```
1499///
1500/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
1501fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1502    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1503        if let Some(new_value) = new_col {
1504            *old_col = Some(new_value);
1505        }
1506    }
1507}
1508
1509/// Compare multiple version columns lexicographically.
1510/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
1511fn versions_are_newer_or_equal(
1512    old_row: &OwnedRow,
1513    new_row: &OwnedRow,
1514    version_column_indices: &[u32],
1515) -> bool {
1516    if version_column_indices.is_empty() {
1517        // No version columns specified, always consider new version as newer
1518        return true;
1519    }
1520
1521    for &idx in version_column_indices {
1522        let old_value = old_row.index(idx as usize);
1523        let new_value = new_row.index(idx as usize);
1524
1525        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1526            std::cmp::Ordering::Less => return true,     // new is newer
1527            std::cmp::Ordering::Greater => return false, // old is newer
1528            std::cmp::Ordering::Equal => continue,       // equal, check next column
1529        }
1530    }
1531
1532    // All version columns are equal, consider new version as equal (should overwrite)
1533    true
1534}
1535
1536#[cfg(test)]
1537mod tests {
1538
1539    use std::iter;
1540    use std::sync::atomic::AtomicU64;
1541
1542    use rand::rngs::SmallRng;
1543    use rand::{Rng, RngCore, SeedableRng};
1544    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1545    use risingwave_common::catalog::Field;
1546    use risingwave_common::util::epoch::test_epoch;
1547    use risingwave_common::util::sort_util::OrderType;
1548    use risingwave_hummock_sdk::HummockReadEpoch;
1549    use risingwave_storage::memory::MemoryStateStore;
1550    use risingwave_storage::table::batch_table::BatchTable;
1551
1552    use super::*;
1553    use crate::executor::test_utils::*;
1554
1555    #[tokio::test]
1556    async fn test_materialize_executor() {
1557        // Prepare storage and memtable.
1558        let memory_state_store = MemoryStateStore::new();
1559        let table_id = TableId::new(1);
1560        // Two columns of int32 type, the first column is PK.
1561        let schema = Schema::new(vec![
1562            Field::unnamed(DataType::Int32),
1563            Field::unnamed(DataType::Int32),
1564        ]);
1565        let column_ids = vec![0.into(), 1.into()];
1566
1567        // Prepare source chunks.
1568        let chunk1 = StreamChunk::from_pretty(
1569            " i i
1570            + 1 4
1571            + 2 5
1572            + 3 6",
1573        );
1574        let chunk2 = StreamChunk::from_pretty(
1575            " i i
1576            + 7 8
1577            - 3 6",
1578        );
1579
1580        // Prepare stream executors.
1581        let source = MockSource::with_messages(vec![
1582            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1583            Message::Chunk(chunk1),
1584            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1585            Message::Chunk(chunk2),
1586            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1587        ])
1588        .into_executor(schema.clone(), StreamKey::new());
1589
1590        let order_types = vec![OrderType::ascending()];
1591        let column_descs = vec![
1592            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1593            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1594        ];
1595
1596        let table = BatchTable::for_test(
1597            memory_state_store.clone(),
1598            table_id,
1599            column_descs,
1600            order_types,
1601            vec![0],
1602            vec![0, 1],
1603        );
1604
1605        let mut materialize_executor = MaterializeExecutor::for_test(
1606            source,
1607            memory_state_store,
1608            table_id,
1609            vec![ColumnOrder::new(0, OrderType::ascending())],
1610            column_ids,
1611            Arc::new(AtomicU64::new(0)),
1612            ConflictBehavior::NoCheck,
1613        )
1614        .await
1615        .boxed()
1616        .execute();
1617        materialize_executor.next().await.transpose().unwrap();
1618
1619        materialize_executor.next().await.transpose().unwrap();
1620
1621        // First stream chunk. We check the existence of (3) -> (3,6)
1622        match materialize_executor.next().await.transpose().unwrap() {
1623            Some(Message::Barrier(_)) => {
1624                let row = table
1625                    .get_row(
1626                        &OwnedRow::new(vec![Some(3_i32.into())]),
1627                        HummockReadEpoch::NoWait(u64::MAX),
1628                    )
1629                    .await
1630                    .unwrap();
1631                assert_eq!(
1632                    row,
1633                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1634                );
1635            }
1636            _ => unreachable!(),
1637        }
1638        materialize_executor.next().await.transpose().unwrap();
1639        // Second stream chunk. We check the existence of (7) -> (7,8)
1640        match materialize_executor.next().await.transpose().unwrap() {
1641            Some(Message::Barrier(_)) => {
1642                let row = table
1643                    .get_row(
1644                        &OwnedRow::new(vec![Some(7_i32.into())]),
1645                        HummockReadEpoch::NoWait(u64::MAX),
1646                    )
1647                    .await
1648                    .unwrap();
1649                assert_eq!(
1650                    row,
1651                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1652                );
1653            }
1654            _ => unreachable!(),
1655        }
1656    }
1657
1658    // https://github.com/risingwavelabs/risingwave/issues/13346
1659    #[tokio::test]
1660    async fn test_upsert_stream() {
1661        // Prepare storage and memtable.
1662        let memory_state_store = MemoryStateStore::new();
1663        let table_id = TableId::new(1);
1664        // Two columns of int32 type, the first column is PK.
1665        let schema = Schema::new(vec![
1666            Field::unnamed(DataType::Int32),
1667            Field::unnamed(DataType::Int32),
1668        ]);
1669        let column_ids = vec![0.into(), 1.into()];
1670
1671        // test double insert one pk, the latter needs to override the former.
1672        let chunk1 = StreamChunk::from_pretty(
1673            " i i
1674            + 1 1",
1675        );
1676
1677        let chunk2 = StreamChunk::from_pretty(
1678            " i i
1679            + 1 2
1680            - 1 2",
1681        );
1682
1683        // Prepare stream executors.
1684        let source = MockSource::with_messages(vec![
1685            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1686            Message::Chunk(chunk1),
1687            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1688            Message::Chunk(chunk2),
1689            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1690        ])
1691        .into_executor(schema.clone(), StreamKey::new());
1692
1693        let order_types = vec![OrderType::ascending()];
1694        let column_descs = vec![
1695            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1696            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1697        ];
1698
1699        let table = BatchTable::for_test(
1700            memory_state_store.clone(),
1701            table_id,
1702            column_descs,
1703            order_types,
1704            vec![0],
1705            vec![0, 1],
1706        );
1707
1708        let mut materialize_executor = MaterializeExecutor::for_test(
1709            source,
1710            memory_state_store,
1711            table_id,
1712            vec![ColumnOrder::new(0, OrderType::ascending())],
1713            column_ids,
1714            Arc::new(AtomicU64::new(0)),
1715            ConflictBehavior::Overwrite,
1716        )
1717        .await
1718        .boxed()
1719        .execute();
1720        materialize_executor.next().await.transpose().unwrap();
1721
1722        materialize_executor.next().await.transpose().unwrap();
1723        materialize_executor.next().await.transpose().unwrap();
1724        materialize_executor.next().await.transpose().unwrap();
1725
1726        match materialize_executor.next().await.transpose().unwrap() {
1727            Some(Message::Barrier(_)) => {
1728                let row = table
1729                    .get_row(
1730                        &OwnedRow::new(vec![Some(1_i32.into())]),
1731                        HummockReadEpoch::NoWait(u64::MAX),
1732                    )
1733                    .await
1734                    .unwrap();
1735                assert!(row.is_none());
1736            }
1737            _ => unreachable!(),
1738        }
1739    }
1740
1741    #[tokio::test]
1742    async fn test_check_insert_conflict() {
1743        // Prepare storage and memtable.
1744        let memory_state_store = MemoryStateStore::new();
1745        let table_id = TableId::new(1);
1746        // Two columns of int32 type, the first column is PK.
1747        let schema = Schema::new(vec![
1748            Field::unnamed(DataType::Int32),
1749            Field::unnamed(DataType::Int32),
1750        ]);
1751        let column_ids = vec![0.into(), 1.into()];
1752
1753        // test double insert one pk, the latter needs to override the former.
1754        let chunk1 = StreamChunk::from_pretty(
1755            " i i
1756            + 1 3
1757            + 1 4
1758            + 2 5
1759            + 3 6",
1760        );
1761
1762        let chunk2 = StreamChunk::from_pretty(
1763            " i i
1764            + 1 3
1765            + 2 6",
1766        );
1767
1768        // test delete wrong value, delete inexistent pk
1769        let chunk3 = StreamChunk::from_pretty(
1770            " i i
1771            + 1 4",
1772        );
1773
1774        // Prepare stream executors.
1775        let source = MockSource::with_messages(vec![
1776            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1777            Message::Chunk(chunk1),
1778            Message::Chunk(chunk2),
1779            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1780            Message::Chunk(chunk3),
1781            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1782        ])
1783        .into_executor(schema.clone(), StreamKey::new());
1784
1785        let order_types = vec![OrderType::ascending()];
1786        let column_descs = vec![
1787            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1788            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1789        ];
1790
1791        let table = BatchTable::for_test(
1792            memory_state_store.clone(),
1793            table_id,
1794            column_descs,
1795            order_types,
1796            vec![0],
1797            vec![0, 1],
1798        );
1799
1800        let mut materialize_executor = MaterializeExecutor::for_test(
1801            source,
1802            memory_state_store,
1803            table_id,
1804            vec![ColumnOrder::new(0, OrderType::ascending())],
1805            column_ids,
1806            Arc::new(AtomicU64::new(0)),
1807            ConflictBehavior::Overwrite,
1808        )
1809        .await
1810        .boxed()
1811        .execute();
1812        materialize_executor.next().await.transpose().unwrap();
1813
1814        materialize_executor.next().await.transpose().unwrap();
1815        materialize_executor.next().await.transpose().unwrap();
1816
1817        // First stream chunk. We check the existence of (3) -> (3,6)
1818        match materialize_executor.next().await.transpose().unwrap() {
1819            Some(Message::Barrier(_)) => {
1820                let row = table
1821                    .get_row(
1822                        &OwnedRow::new(vec![Some(3_i32.into())]),
1823                        HummockReadEpoch::NoWait(u64::MAX),
1824                    )
1825                    .await
1826                    .unwrap();
1827                assert_eq!(
1828                    row,
1829                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1830                );
1831
1832                let row = table
1833                    .get_row(
1834                        &OwnedRow::new(vec![Some(1_i32.into())]),
1835                        HummockReadEpoch::NoWait(u64::MAX),
1836                    )
1837                    .await
1838                    .unwrap();
1839                assert_eq!(
1840                    row,
1841                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1842                );
1843
1844                let row = table
1845                    .get_row(
1846                        &OwnedRow::new(vec![Some(2_i32.into())]),
1847                        HummockReadEpoch::NoWait(u64::MAX),
1848                    )
1849                    .await
1850                    .unwrap();
1851                assert_eq!(
1852                    row,
1853                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1854                );
1855            }
1856            _ => unreachable!(),
1857        }
1858    }
1859
1860    #[tokio::test]
1861    async fn test_delete_and_update_conflict() {
1862        // Prepare storage and memtable.
1863        let memory_state_store = MemoryStateStore::new();
1864        let table_id = TableId::new(1);
1865        // Two columns of int32 type, the first column is PK.
1866        let schema = Schema::new(vec![
1867            Field::unnamed(DataType::Int32),
1868            Field::unnamed(DataType::Int32),
1869        ]);
1870        let column_ids = vec![0.into(), 1.into()];
1871
1872        // test double insert one pk, the latter needs to override the former.
1873        let chunk1 = StreamChunk::from_pretty(
1874            " i i
1875            + 1 4
1876            + 2 5
1877            + 3 6
1878            U- 8 1
1879            U+ 8 2
1880            + 8 3",
1881        );
1882
1883        // test delete wrong value, delete inexistent pk
1884        let chunk2 = StreamChunk::from_pretty(
1885            " i i
1886            + 7 8
1887            - 3 4
1888            - 5 0",
1889        );
1890
1891        // test delete wrong value, delete inexistent pk
1892        let chunk3 = StreamChunk::from_pretty(
1893            " i i
1894            + 1 5
1895            U- 2 4
1896            U+ 2 8
1897            U- 9 0
1898            U+ 9 1",
1899        );
1900
1901        // Prepare stream executors.
1902        let source = MockSource::with_messages(vec![
1903            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1904            Message::Chunk(chunk1),
1905            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1906            Message::Chunk(chunk2),
1907            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1908            Message::Chunk(chunk3),
1909            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1910        ])
1911        .into_executor(schema.clone(), StreamKey::new());
1912
1913        let order_types = vec![OrderType::ascending()];
1914        let column_descs = vec![
1915            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1916            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1917        ];
1918
1919        let table = BatchTable::for_test(
1920            memory_state_store.clone(),
1921            table_id,
1922            column_descs,
1923            order_types,
1924            vec![0],
1925            vec![0, 1],
1926        );
1927
1928        let mut materialize_executor = MaterializeExecutor::for_test(
1929            source,
1930            memory_state_store,
1931            table_id,
1932            vec![ColumnOrder::new(0, OrderType::ascending())],
1933            column_ids,
1934            Arc::new(AtomicU64::new(0)),
1935            ConflictBehavior::Overwrite,
1936        )
1937        .await
1938        .boxed()
1939        .execute();
1940        materialize_executor.next().await.transpose().unwrap();
1941
1942        materialize_executor.next().await.transpose().unwrap();
1943
1944        // First stream chunk. We check the existence of (3) -> (3,6)
1945        match materialize_executor.next().await.transpose().unwrap() {
1946            Some(Message::Barrier(_)) => {
1947                // can read (8, 3), check insert after update
1948                let row = table
1949                    .get_row(
1950                        &OwnedRow::new(vec![Some(8_i32.into())]),
1951                        HummockReadEpoch::NoWait(u64::MAX),
1952                    )
1953                    .await
1954                    .unwrap();
1955                assert_eq!(
1956                    row,
1957                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1958                );
1959            }
1960            _ => unreachable!(),
1961        }
1962        materialize_executor.next().await.transpose().unwrap();
1963
1964        match materialize_executor.next().await.transpose().unwrap() {
1965            Some(Message::Barrier(_)) => {
1966                let row = table
1967                    .get_row(
1968                        &OwnedRow::new(vec![Some(7_i32.into())]),
1969                        HummockReadEpoch::NoWait(u64::MAX),
1970                    )
1971                    .await
1972                    .unwrap();
1973                assert_eq!(
1974                    row,
1975                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1976                );
1977
1978                // check delete wrong value
1979                let row = table
1980                    .get_row(
1981                        &OwnedRow::new(vec![Some(3_i32.into())]),
1982                        HummockReadEpoch::NoWait(u64::MAX),
1983                    )
1984                    .await
1985                    .unwrap();
1986                assert_eq!(row, None);
1987
1988                // check delete wrong pk
1989                let row = table
1990                    .get_row(
1991                        &OwnedRow::new(vec![Some(5_i32.into())]),
1992                        HummockReadEpoch::NoWait(u64::MAX),
1993                    )
1994                    .await
1995                    .unwrap();
1996                assert_eq!(row, None);
1997            }
1998            _ => unreachable!(),
1999        }
2000
2001        materialize_executor.next().await.transpose().unwrap();
2002        // Second stream chunk. We check the existence of (7) -> (7,8)
2003        match materialize_executor.next().await.transpose().unwrap() {
2004            Some(Message::Barrier(_)) => {
2005                let row = table
2006                    .get_row(
2007                        &OwnedRow::new(vec![Some(1_i32.into())]),
2008                        HummockReadEpoch::NoWait(u64::MAX),
2009                    )
2010                    .await
2011                    .unwrap();
2012                assert_eq!(
2013                    row,
2014                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
2015                );
2016
2017                // check update wrong value
2018                let row = table
2019                    .get_row(
2020                        &OwnedRow::new(vec![Some(2_i32.into())]),
2021                        HummockReadEpoch::NoWait(u64::MAX),
2022                    )
2023                    .await
2024                    .unwrap();
2025                assert_eq!(
2026                    row,
2027                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2028                );
2029
2030                // check update wrong pk, should become insert
2031                let row = table
2032                    .get_row(
2033                        &OwnedRow::new(vec![Some(9_i32.into())]),
2034                        HummockReadEpoch::NoWait(u64::MAX),
2035                    )
2036                    .await
2037                    .unwrap();
2038                assert_eq!(
2039                    row,
2040                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2041                );
2042            }
2043            _ => unreachable!(),
2044        }
2045    }
2046
2047    #[tokio::test]
2048    async fn test_ignore_insert_conflict() {
2049        // Prepare storage and memtable.
2050        let memory_state_store = MemoryStateStore::new();
2051        let table_id = TableId::new(1);
2052        // Two columns of int32 type, the first column is PK.
2053        let schema = Schema::new(vec![
2054            Field::unnamed(DataType::Int32),
2055            Field::unnamed(DataType::Int32),
2056        ]);
2057        let column_ids = vec![0.into(), 1.into()];
2058
2059        // test double insert one pk, the latter needs to be ignored.
2060        let chunk1 = StreamChunk::from_pretty(
2061            " i i
2062            + 1 3
2063            + 1 4
2064            + 2 5
2065            + 3 6",
2066        );
2067
2068        let chunk2 = StreamChunk::from_pretty(
2069            " i i
2070            + 1 5
2071            + 2 6",
2072        );
2073
2074        // test delete wrong value, delete inexistent pk
2075        let chunk3 = StreamChunk::from_pretty(
2076            " i i
2077            + 1 6",
2078        );
2079
2080        // Prepare stream executors.
2081        let source = MockSource::with_messages(vec![
2082            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2083            Message::Chunk(chunk1),
2084            Message::Chunk(chunk2),
2085            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2086            Message::Chunk(chunk3),
2087            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2088        ])
2089        .into_executor(schema.clone(), StreamKey::new());
2090
2091        let order_types = vec![OrderType::ascending()];
2092        let column_descs = vec![
2093            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2094            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2095        ];
2096
2097        let table = BatchTable::for_test(
2098            memory_state_store.clone(),
2099            table_id,
2100            column_descs,
2101            order_types,
2102            vec![0],
2103            vec![0, 1],
2104        );
2105
2106        let mut materialize_executor = MaterializeExecutor::for_test(
2107            source,
2108            memory_state_store,
2109            table_id,
2110            vec![ColumnOrder::new(0, OrderType::ascending())],
2111            column_ids,
2112            Arc::new(AtomicU64::new(0)),
2113            ConflictBehavior::IgnoreConflict,
2114        )
2115        .await
2116        .boxed()
2117        .execute();
2118        materialize_executor.next().await.transpose().unwrap();
2119
2120        materialize_executor.next().await.transpose().unwrap();
2121        materialize_executor.next().await.transpose().unwrap();
2122
2123        // First stream chunk. We check the existence of (3) -> (3,6)
2124        match materialize_executor.next().await.transpose().unwrap() {
2125            Some(Message::Barrier(_)) => {
2126                let row = table
2127                    .get_row(
2128                        &OwnedRow::new(vec![Some(3_i32.into())]),
2129                        HummockReadEpoch::NoWait(u64::MAX),
2130                    )
2131                    .await
2132                    .unwrap();
2133                assert_eq!(
2134                    row,
2135                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2136                );
2137
2138                let row = table
2139                    .get_row(
2140                        &OwnedRow::new(vec![Some(1_i32.into())]),
2141                        HummockReadEpoch::NoWait(u64::MAX),
2142                    )
2143                    .await
2144                    .unwrap();
2145                assert_eq!(
2146                    row,
2147                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2148                );
2149
2150                let row = table
2151                    .get_row(
2152                        &OwnedRow::new(vec![Some(2_i32.into())]),
2153                        HummockReadEpoch::NoWait(u64::MAX),
2154                    )
2155                    .await
2156                    .unwrap();
2157                assert_eq!(
2158                    row,
2159                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2160                );
2161            }
2162            _ => unreachable!(),
2163        }
2164    }
2165
2166    #[tokio::test]
2167    async fn test_ignore_delete_then_insert() {
2168        // Prepare storage and memtable.
2169        let memory_state_store = MemoryStateStore::new();
2170        let table_id = TableId::new(1);
2171        // Two columns of int32 type, the first column is PK.
2172        let schema = Schema::new(vec![
2173            Field::unnamed(DataType::Int32),
2174            Field::unnamed(DataType::Int32),
2175        ]);
2176        let column_ids = vec![0.into(), 1.into()];
2177
2178        // test insert after delete one pk, the latter insert should succeed.
2179        let chunk1 = StreamChunk::from_pretty(
2180            " i i
2181            + 1 3
2182            - 1 3
2183            + 1 6",
2184        );
2185
2186        // Prepare stream executors.
2187        let source = MockSource::with_messages(vec![
2188            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2189            Message::Chunk(chunk1),
2190            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2191        ])
2192        .into_executor(schema.clone(), StreamKey::new());
2193
2194        let order_types = vec![OrderType::ascending()];
2195        let column_descs = vec![
2196            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2197            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2198        ];
2199
2200        let table = BatchTable::for_test(
2201            memory_state_store.clone(),
2202            table_id,
2203            column_descs,
2204            order_types,
2205            vec![0],
2206            vec![0, 1],
2207        );
2208
2209        let mut materialize_executor = MaterializeExecutor::for_test(
2210            source,
2211            memory_state_store,
2212            table_id,
2213            vec![ColumnOrder::new(0, OrderType::ascending())],
2214            column_ids,
2215            Arc::new(AtomicU64::new(0)),
2216            ConflictBehavior::IgnoreConflict,
2217        )
2218        .await
2219        .boxed()
2220        .execute();
2221        let _msg1 = materialize_executor
2222            .next()
2223            .await
2224            .transpose()
2225            .unwrap()
2226            .unwrap()
2227            .as_barrier()
2228            .unwrap();
2229        let _msg2 = materialize_executor
2230            .next()
2231            .await
2232            .transpose()
2233            .unwrap()
2234            .unwrap()
2235            .as_chunk()
2236            .unwrap();
2237        let _msg3 = materialize_executor
2238            .next()
2239            .await
2240            .transpose()
2241            .unwrap()
2242            .unwrap()
2243            .as_barrier()
2244            .unwrap();
2245
2246        let row = table
2247            .get_row(
2248                &OwnedRow::new(vec![Some(1_i32.into())]),
2249                HummockReadEpoch::NoWait(u64::MAX),
2250            )
2251            .await
2252            .unwrap();
2253        assert_eq!(
2254            row,
2255            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2256        );
2257    }
2258
2259    #[tokio::test]
2260    async fn test_ignore_delete_and_update_conflict() {
2261        // Prepare storage and memtable.
2262        let memory_state_store = MemoryStateStore::new();
2263        let table_id = TableId::new(1);
2264        // Two columns of int32 type, the first column is PK.
2265        let schema = Schema::new(vec![
2266            Field::unnamed(DataType::Int32),
2267            Field::unnamed(DataType::Int32),
2268        ]);
2269        let column_ids = vec![0.into(), 1.into()];
2270
2271        // test double insert one pk, the latter should be ignored.
2272        let chunk1 = StreamChunk::from_pretty(
2273            " i i
2274            + 1 4
2275            + 2 5
2276            + 3 6
2277            U- 8 1
2278            U+ 8 2
2279            + 8 3",
2280        );
2281
2282        // test delete wrong value, delete inexistent pk
2283        let chunk2 = StreamChunk::from_pretty(
2284            " i i
2285            + 7 8
2286            - 3 4
2287            - 5 0",
2288        );
2289
2290        // test delete wrong value, delete inexistent pk
2291        let chunk3 = StreamChunk::from_pretty(
2292            " i i
2293            + 1 5
2294            U- 2 4
2295            U+ 2 8
2296            U- 9 0
2297            U+ 9 1",
2298        );
2299
2300        // Prepare stream executors.
2301        let source = MockSource::with_messages(vec![
2302            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2303            Message::Chunk(chunk1),
2304            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2305            Message::Chunk(chunk2),
2306            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2307            Message::Chunk(chunk3),
2308            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2309        ])
2310        .into_executor(schema.clone(), StreamKey::new());
2311
2312        let order_types = vec![OrderType::ascending()];
2313        let column_descs = vec![
2314            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2315            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2316        ];
2317
2318        let table = BatchTable::for_test(
2319            memory_state_store.clone(),
2320            table_id,
2321            column_descs,
2322            order_types,
2323            vec![0],
2324            vec![0, 1],
2325        );
2326
2327        let mut materialize_executor = MaterializeExecutor::for_test(
2328            source,
2329            memory_state_store,
2330            table_id,
2331            vec![ColumnOrder::new(0, OrderType::ascending())],
2332            column_ids,
2333            Arc::new(AtomicU64::new(0)),
2334            ConflictBehavior::IgnoreConflict,
2335        )
2336        .await
2337        .boxed()
2338        .execute();
2339        materialize_executor.next().await.transpose().unwrap();
2340
2341        materialize_executor.next().await.transpose().unwrap();
2342
2343        // First stream chunk. We check the existence of (3) -> (3,6)
2344        match materialize_executor.next().await.transpose().unwrap() {
2345            Some(Message::Barrier(_)) => {
2346                // can read (8, 2), check insert after update
2347                let row = table
2348                    .get_row(
2349                        &OwnedRow::new(vec![Some(8_i32.into())]),
2350                        HummockReadEpoch::NoWait(u64::MAX),
2351                    )
2352                    .await
2353                    .unwrap();
2354                assert_eq!(
2355                    row,
2356                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2357                );
2358            }
2359            _ => unreachable!(),
2360        }
2361        materialize_executor.next().await.transpose().unwrap();
2362
2363        match materialize_executor.next().await.transpose().unwrap() {
2364            Some(Message::Barrier(_)) => {
2365                let row = table
2366                    .get_row(
2367                        &OwnedRow::new(vec![Some(7_i32.into())]),
2368                        HummockReadEpoch::NoWait(u64::MAX),
2369                    )
2370                    .await
2371                    .unwrap();
2372                assert_eq!(
2373                    row,
2374                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2375                );
2376
2377                // check delete wrong value
2378                let row = table
2379                    .get_row(
2380                        &OwnedRow::new(vec![Some(3_i32.into())]),
2381                        HummockReadEpoch::NoWait(u64::MAX),
2382                    )
2383                    .await
2384                    .unwrap();
2385                assert_eq!(row, None);
2386
2387                // check delete wrong pk
2388                let row = table
2389                    .get_row(
2390                        &OwnedRow::new(vec![Some(5_i32.into())]),
2391                        HummockReadEpoch::NoWait(u64::MAX),
2392                    )
2393                    .await
2394                    .unwrap();
2395                assert_eq!(row, None);
2396            }
2397            _ => unreachable!(),
2398        }
2399
2400        materialize_executor.next().await.transpose().unwrap();
2401        // materialize_executor.next().await.transpose().unwrap();
2402        // Second stream chunk. We check the existence of (7) -> (7,8)
2403        match materialize_executor.next().await.transpose().unwrap() {
2404            Some(Message::Barrier(_)) => {
2405                let row = table
2406                    .get_row(
2407                        &OwnedRow::new(vec![Some(1_i32.into())]),
2408                        HummockReadEpoch::NoWait(u64::MAX),
2409                    )
2410                    .await
2411                    .unwrap();
2412                assert_eq!(
2413                    row,
2414                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2415                );
2416
2417                // check update wrong value
2418                let row = table
2419                    .get_row(
2420                        &OwnedRow::new(vec![Some(2_i32.into())]),
2421                        HummockReadEpoch::NoWait(u64::MAX),
2422                    )
2423                    .await
2424                    .unwrap();
2425                assert_eq!(
2426                    row,
2427                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2428                );
2429
2430                // check update wrong pk, should become insert
2431                let row = table
2432                    .get_row(
2433                        &OwnedRow::new(vec![Some(9_i32.into())]),
2434                        HummockReadEpoch::NoWait(u64::MAX),
2435                    )
2436                    .await
2437                    .unwrap();
2438                assert_eq!(
2439                    row,
2440                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2441                );
2442            }
2443            _ => unreachable!(),
2444        }
2445    }
2446
2447    #[tokio::test]
2448    async fn test_do_update_if_not_null_conflict() {
2449        // Prepare storage and memtable.
2450        let memory_state_store = MemoryStateStore::new();
2451        let table_id = TableId::new(1);
2452        // Two columns of int32 type, the first column is PK.
2453        let schema = Schema::new(vec![
2454            Field::unnamed(DataType::Int32),
2455            Field::unnamed(DataType::Int32),
2456        ]);
2457        let column_ids = vec![0.into(), 1.into()];
2458
2459        // should get (8, 2)
2460        let chunk1 = StreamChunk::from_pretty(
2461            " i i
2462            + 1 4
2463            + 2 .
2464            + 3 6
2465            U- 8 .
2466            U+ 8 2
2467            + 8 .",
2468        );
2469
2470        // should not get (3, x), should not get (5, 0)
2471        let chunk2 = StreamChunk::from_pretty(
2472            " i i
2473            + 7 8
2474            - 3 4
2475            - 5 0",
2476        );
2477
2478        // should get (2, None), (7, 8)
2479        let chunk3 = StreamChunk::from_pretty(
2480            " i i
2481            + 1 5
2482            + 7 .
2483            U- 2 4
2484            U+ 2 .
2485            U- 9 0
2486            U+ 9 1",
2487        );
2488
2489        // Prepare stream executors.
2490        let source = MockSource::with_messages(vec![
2491            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2492            Message::Chunk(chunk1),
2493            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2494            Message::Chunk(chunk2),
2495            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2496            Message::Chunk(chunk3),
2497            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2498        ])
2499        .into_executor(schema.clone(), StreamKey::new());
2500
2501        let order_types = vec![OrderType::ascending()];
2502        let column_descs = vec![
2503            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2504            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2505        ];
2506
2507        let table = BatchTable::for_test(
2508            memory_state_store.clone(),
2509            table_id,
2510            column_descs,
2511            order_types,
2512            vec![0],
2513            vec![0, 1],
2514        );
2515
2516        let mut materialize_executor = MaterializeExecutor::for_test(
2517            source,
2518            memory_state_store,
2519            table_id,
2520            vec![ColumnOrder::new(0, OrderType::ascending())],
2521            column_ids,
2522            Arc::new(AtomicU64::new(0)),
2523            ConflictBehavior::DoUpdateIfNotNull,
2524        )
2525        .await
2526        .boxed()
2527        .execute();
2528        materialize_executor.next().await.transpose().unwrap();
2529
2530        materialize_executor.next().await.transpose().unwrap();
2531
2532        // First stream chunk. We check the existence of (3) -> (3,6)
2533        match materialize_executor.next().await.transpose().unwrap() {
2534            Some(Message::Barrier(_)) => {
2535                let row = table
2536                    .get_row(
2537                        &OwnedRow::new(vec![Some(8_i32.into())]),
2538                        HummockReadEpoch::NoWait(u64::MAX),
2539                    )
2540                    .await
2541                    .unwrap();
2542                assert_eq!(
2543                    row,
2544                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2545                );
2546
2547                let row = table
2548                    .get_row(
2549                        &OwnedRow::new(vec![Some(2_i32.into())]),
2550                        HummockReadEpoch::NoWait(u64::MAX),
2551                    )
2552                    .await
2553                    .unwrap();
2554                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2555            }
2556            _ => unreachable!(),
2557        }
2558        materialize_executor.next().await.transpose().unwrap();
2559
2560        match materialize_executor.next().await.transpose().unwrap() {
2561            Some(Message::Barrier(_)) => {
2562                let row = table
2563                    .get_row(
2564                        &OwnedRow::new(vec![Some(7_i32.into())]),
2565                        HummockReadEpoch::NoWait(u64::MAX),
2566                    )
2567                    .await
2568                    .unwrap();
2569                assert_eq!(
2570                    row,
2571                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2572                );
2573
2574                // check delete wrong value
2575                let row = table
2576                    .get_row(
2577                        &OwnedRow::new(vec![Some(3_i32.into())]),
2578                        HummockReadEpoch::NoWait(u64::MAX),
2579                    )
2580                    .await
2581                    .unwrap();
2582                assert_eq!(row, None);
2583
2584                // check delete wrong pk
2585                let row = table
2586                    .get_row(
2587                        &OwnedRow::new(vec![Some(5_i32.into())]),
2588                        HummockReadEpoch::NoWait(u64::MAX),
2589                    )
2590                    .await
2591                    .unwrap();
2592                assert_eq!(row, None);
2593            }
2594            _ => unreachable!(),
2595        }
2596
2597        materialize_executor.next().await.transpose().unwrap();
2598        // materialize_executor.next().await.transpose().unwrap();
2599        // Second stream chunk. We check the existence of (7) -> (7,8)
2600        match materialize_executor.next().await.transpose().unwrap() {
2601            Some(Message::Barrier(_)) => {
2602                let row = table
2603                    .get_row(
2604                        &OwnedRow::new(vec![Some(7_i32.into())]),
2605                        HummockReadEpoch::NoWait(u64::MAX),
2606                    )
2607                    .await
2608                    .unwrap();
2609                assert_eq!(
2610                    row,
2611                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2612                );
2613
2614                // check update wrong value
2615                let row = table
2616                    .get_row(
2617                        &OwnedRow::new(vec![Some(2_i32.into())]),
2618                        HummockReadEpoch::NoWait(u64::MAX),
2619                    )
2620                    .await
2621                    .unwrap();
2622                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2623
2624                // check update wrong pk, should become insert
2625                let row = table
2626                    .get_row(
2627                        &OwnedRow::new(vec![Some(9_i32.into())]),
2628                        HummockReadEpoch::NoWait(u64::MAX),
2629                    )
2630                    .await
2631                    .unwrap();
2632                assert_eq!(
2633                    row,
2634                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2635                );
2636            }
2637            _ => unreachable!(),
2638        }
2639    }
2640
2641    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2642        const KN: u32 = 4;
2643        const SEED: u64 = 998244353;
2644        let mut ret = vec![];
2645        let mut builder =
2646            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2647        let mut rng = SmallRng::seed_from_u64(SEED);
2648
2649        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2650            let len = c.data_chunk().capacity();
2651            let mut c = StreamChunkMut::from(c);
2652            for i in 0..len {
2653                c.set_vis(i, rng.random_bool(0.5));
2654            }
2655            c.into()
2656        };
2657        for _ in 0..row_number {
2658            let k = (rng.next_u32() % KN) as i32;
2659            let v = rng.next_u32() as i32;
2660            let op = if rng.random_bool(0.5) {
2661                Op::Insert
2662            } else {
2663                Op::Delete
2664            };
2665            if let Some(c) =
2666                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2667            {
2668                ret.push(random_vis(c, &mut rng));
2669            }
2670        }
2671        if let Some(c) = builder.take() {
2672            ret.push(random_vis(c, &mut rng));
2673        }
2674        ret
2675    }
2676
2677    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2678        const N: usize = 100000;
2679
2680        // Prepare storage and memtable.
2681        let memory_state_store = MemoryStateStore::new();
2682        let table_id = TableId::new(1);
2683        // Two columns of int32 type, the first column is PK.
2684        let schema = Schema::new(vec![
2685            Field::unnamed(DataType::Int32),
2686            Field::unnamed(DataType::Int32),
2687        ]);
2688        let column_ids = vec![0.into(), 1.into()];
2689
2690        let chunks = gen_fuzz_data(N, 128);
2691        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2692            .chain(chunks.into_iter().map(Message::Chunk))
2693            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2694                test_epoch(2),
2695            ))))
2696            .collect();
2697        // Prepare stream executors.
2698        let source =
2699            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2700
2701        let mut materialize_executor = MaterializeExecutor::for_test(
2702            source,
2703            memory_state_store.clone(),
2704            table_id,
2705            vec![ColumnOrder::new(0, OrderType::ascending())],
2706            column_ids,
2707            Arc::new(AtomicU64::new(0)),
2708            conflict_behavior,
2709        )
2710        .await
2711        .boxed()
2712        .execute();
2713        materialize_executor.expect_barrier().await;
2714
2715        let order_types = vec![OrderType::ascending()];
2716        let column_descs = vec![
2717            ColumnDesc::unnamed(0.into(), DataType::Int32),
2718            ColumnDesc::unnamed(1.into(), DataType::Int32),
2719        ];
2720        let pk_indices = vec![0];
2721
2722        let mut table = StateTable::from_table_catalog(
2723            &crate::common::table::test_utils::gen_pbtable(
2724                TableId::from(1002),
2725                column_descs.clone(),
2726                order_types,
2727                pk_indices,
2728                0,
2729            ),
2730            memory_state_store.clone(),
2731            None,
2732        )
2733        .await;
2734
2735        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2736            // check with state table's memtable
2737            table.write_chunk(c);
2738        }
2739    }
2740
2741    #[tokio::test]
2742    async fn fuzz_test_stream_consistent_upsert() {
2743        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2744    }
2745
2746    #[tokio::test]
2747    async fn fuzz_test_stream_consistent_ignore() {
2748        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2749    }
2750}