risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Bound, Deref, Index};
20
21use bytes::Bytes;
22use futures::future::Either;
23use futures::stream::{self, select_with_strategy};
24use futures_async_stream::try_stream;
25use itertools::Itertools;
26use risingwave_common::array::Op;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{
29    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
30};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
33use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
34use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
35use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
36use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
37use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
38use risingwave_hummock_sdk::HummockReadEpoch;
39use risingwave_pb::catalog::Table;
40use risingwave_pb::catalog::table::{Engine, OptionalAssociatedSourceId};
41use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
42use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
43use risingwave_storage::table::KeyedRow;
44
45use crate::cache::ManagedLruCache;
46use crate::common::metrics::MetricsInfo;
47use crate::common::table::state_table::{
48    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
49};
50use crate::executor::error::ErrorKind;
51use crate::executor::monitor::MaterializeMetrics;
52use crate::executor::mview::RefreshProgressTable;
53use crate::executor::prelude::*;
54use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
55use crate::task::LocalBarrierManager;
56
57#[derive(Debug, Clone)]
58pub enum MaterializeStreamState<M> {
59    NormalIngestion,
60    MergingData,
61    CleanUp,
62    CommitAndYieldBarrier {
63        barrier: BarrierInner<M>,
64        expect_next_state: Box<MaterializeStreamState<M>>,
65    },
66    RefreshEnd {
67        on_complete_epoch: EpochPair,
68    },
69}
70
71/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
72pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
73    input: Executor,
74
75    schema: Schema,
76
77    state_table: StateTableInner<S, SD>,
78
79    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
80    arrange_key_indices: Vec<usize>,
81
82    actor_context: ActorContextRef,
83
84    materialize_cache: MaterializeCache<SD>,
85
86    conflict_behavior: ConflictBehavior,
87
88    version_column_indices: Vec<u32>,
89
90    may_have_downstream: bool,
91
92    depended_subscription_ids: HashSet<u32>,
93
94    metrics: MaterializeMetrics,
95
96    /// No data will be written to hummock table. This Materialize is just a dummy node.
97    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
98    is_dummy_table: bool,
99
100    /// Indices of TOAST-able columns for PostgreSQL CDC tables. None means either non-CDC table or CDC table without TOAST-able columns.
101    toastable_column_indices: Option<Vec<usize>>,
102
103    /// Optional refresh arguments and state for refreshable materialized views
104    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
105
106    /// Local barrier manager for reporting barrier events
107    local_barrier_manager: LocalBarrierManager,
108}
109
110/// Arguments and state for refreshable materialized views
111pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
112    /// Table catalog for main table
113    pub table_catalog: Table,
114
115    /// Table catalog for staging table
116    pub staging_table_catalog: Table,
117
118    /// Flag indicating if this table is currently being refreshed
119    pub is_refreshing: bool,
120
121    /// During data refresh (between `RefreshStart` and `LoadFinish`),
122    /// data will be written to both the main table and the staging table.
123    ///
124    /// The staging table is PK-only.
125    ///
126    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
127    pub staging_table: StateTableInner<S, SD>,
128
129    /// Progress table for tracking refresh state per `VNode` for fault tolerance
130    pub progress_table: RefreshProgressTable<S>,
131
132    /// Table ID for this refreshable materialized view
133    pub table_id: TableId,
134}
135
136impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
137    /// Create new `RefreshableMaterializeArgs`
138    pub async fn new(
139        store: S,
140        table_catalog: &Table,
141        staging_table_catalog: &Table,
142        progress_state_table: &Table,
143        vnodes: Option<Arc<Bitmap>>,
144    ) -> Self {
145        let table_id = TableId::new(table_catalog.id);
146
147        // staging table is pk-only, and we don't need to check value consistency
148        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
149            staging_table_catalog,
150            store.clone(),
151            vnodes.clone(),
152        )
153        .await;
154
155        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
156            progress_state_table,
157            store,
158            vnodes,
159        )
160        .await;
161
162        // Get primary key length from main table catalog
163        let pk_len = table_catalog.pk.len();
164        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
165
166        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
167
168        Self {
169            table_catalog: table_catalog.clone(),
170            staging_table_catalog: staging_table_catalog.clone(),
171            is_refreshing: false,
172            staging_table,
173            progress_table,
174            table_id,
175        }
176    }
177}
178
179fn get_op_consistency_level(
180    conflict_behavior: ConflictBehavior,
181    may_have_downstream: bool,
182    depended_subscriptions: &HashSet<u32>,
183) -> StateTableOpConsistencyLevel {
184    if !depended_subscriptions.is_empty() {
185        StateTableOpConsistencyLevel::LogStoreEnabled
186    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
187        // Table with overwrite conflict behavior could disable conflict check
188        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
189        StateTableOpConsistencyLevel::Inconsistent
190    } else {
191        StateTableOpConsistencyLevel::ConsistentOldValue
192    }
193}
194
195impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
196    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
197    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
198    /// should be `None`.
199    #[allow(clippy::too_many_arguments)]
200    pub async fn new(
201        input: Executor,
202        schema: Schema,
203        store: S,
204        arrange_key: Vec<ColumnOrder>,
205        actor_context: ActorContextRef,
206        vnodes: Option<Arc<Bitmap>>,
207        table_catalog: &Table,
208        watermark_epoch: AtomicU64Ref,
209        conflict_behavior: ConflictBehavior,
210        version_column_indices: Vec<u32>,
211        metrics: Arc<StreamingMetrics>,
212        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
213        local_barrier_manager: LocalBarrierManager,
214    ) -> Self {
215        let table_columns: Vec<ColumnDesc> = table_catalog
216            .columns
217            .iter()
218            .map(|col| col.column_desc.as_ref().unwrap().into())
219            .collect();
220
221        // Extract TOAST-able column indices from table columns.
222        // Only for PostgreSQL CDC tables.
223        let toastable_column_indices = if table_catalog.cdc_table_type()
224            == risingwave_pb::catalog::table::CdcTableType::Postgres
225        {
226            let toastable_indices: Vec<usize> = table_columns
227                .iter()
228                .enumerate()
229                .filter_map(|(index, column)| match &column.data_type {
230                    // Currently supports TOAST updates for:
231                    // - jsonb (DataType::Jsonb)
232                    // - varchar (DataType::Varchar)
233                    // - bytea (DataType::Bytea)
234                    // - One-dimensional arrays of the above types (DataType::List)
235                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
236
237                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
238                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
239                        Some(index)
240                    }
241                    _ => None,
242                })
243                .collect();
244
245            if toastable_indices.is_empty() {
246                None
247            } else {
248                Some(toastable_indices)
249            }
250        } else {
251            None
252        };
253
254        let row_serde: BasicSerde = BasicSerde::new(
255            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
256            Arc::from(table_columns.into_boxed_slice()),
257        );
258
259        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
260        let may_have_downstream = actor_context.initial_dispatch_num != 0;
261        let depended_subscription_ids = actor_context
262            .related_subscriptions
263            .get(&TableId::new(table_catalog.id))
264            .cloned()
265            .unwrap_or_default();
266        let op_consistency_level = get_op_consistency_level(
267            conflict_behavior,
268            may_have_downstream,
269            &depended_subscription_ids,
270        );
271        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
272        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
273            .with_op_consistency_level(op_consistency_level)
274            .enable_preload_all_rows_by_config(&actor_context.streaming_config)
275            .build()
276            .await;
277
278        let mv_metrics = metrics.new_materialize_metrics(
279            TableId::new(table_catalog.id),
280            actor_context.id,
281            actor_context.fragment_id,
282        );
283
284        let metrics_info =
285            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
286
287        let is_dummy_table =
288            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
289
290        Self {
291            input,
292            schema,
293            state_table,
294            arrange_key_indices,
295            actor_context,
296            materialize_cache: MaterializeCache::new(
297                watermark_epoch,
298                metrics_info,
299                row_serde,
300                version_column_indices.clone(),
301            ),
302            conflict_behavior,
303            version_column_indices,
304            is_dummy_table,
305            may_have_downstream,
306            depended_subscription_ids,
307            metrics: mv_metrics,
308            toastable_column_indices,
309            refresh_args,
310            local_barrier_manager,
311        }
312    }
313
314    #[try_stream(ok = Message, error = StreamExecutorError)]
315    async fn execute_inner(mut self) {
316        let mv_table_id = TableId::new(self.state_table.table_id());
317        let _staging_table_id = TableId::new(self.state_table.table_id());
318        let data_types = self.schema.data_types();
319        let mut input = self.input.execute();
320
321        let barrier = expect_first_barrier(&mut input).await?;
322        let first_epoch = barrier.epoch;
323        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
324        // The first barrier message should be propagated.
325        yield Message::Barrier(barrier);
326        self.state_table.init_epoch(first_epoch).await?;
327
328        // default to normal ingestion
329        let mut inner_state =
330            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
331        // Initialize staging table for refreshable materialized views
332        if let Some(ref mut refresh_args) = self.refresh_args {
333            refresh_args.staging_table.init_epoch(first_epoch).await?;
334
335            // Initialize progress table and load existing progress for recovery
336            refresh_args.progress_table.recover(first_epoch).await?;
337
338            // Check if refresh is already in progress (recovery scenario)
339            let progress_stats = refresh_args.progress_table.get_progress_stats();
340            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
341                refresh_args.is_refreshing = true;
342                tracing::info!(
343                    total_vnodes = progress_stats.total_vnodes,
344                    completed_vnodes = progress_stats.completed_vnodes,
345                    "Recovered refresh in progress, resuming refresh operation"
346                );
347
348                // Since stage info is no longer stored in progress table,
349                // we need to determine recovery state differently.
350                // For now, assume all incomplete VNodes need to continue merging
351                let incomplete_vnodes: Vec<_> = refresh_args
352                    .progress_table
353                    .get_all_progress()
354                    .iter()
355                    .filter(|(_, entry)| !entry.is_completed)
356                    .map(|(&vnode, _)| vnode)
357                    .collect();
358
359                if !incomplete_vnodes.is_empty() {
360                    // Some VNodes are incomplete, need to resume refresh operation
361                    tracing::info!(
362                        incomplete_vnodes = incomplete_vnodes.len(),
363                        "Recovery detected incomplete VNodes, resuming refresh operation"
364                    );
365                    // Since stage tracking is now in memory, we'll determine the appropriate
366                    // stage based on the executor's internal state machine
367                } else {
368                    // This should not happen if is_complete() returned false, but handle it gracefully
369                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
370                }
371            }
372        }
373
374        // Determine initial execution stage (for recovery scenarios)
375        if let Some(ref refresh_args) = self.refresh_args
376            && refresh_args.is_refreshing
377        {
378            // Recovery logic: Check if there are incomplete vnodes from previous run
379            let incomplete_vnodes: Vec<_> = refresh_args
380                .progress_table
381                .get_all_progress()
382                .iter()
383                .filter(|(_, entry)| !entry.is_completed)
384                .map(|(&vnode, _)| vnode)
385                .collect();
386            if !incomplete_vnodes.is_empty() {
387                // Resume from merge stage since some VNodes were left incomplete
388                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
389                tracing::info!(
390                    incomplete_vnodes = incomplete_vnodes.len(),
391                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
392                );
393            }
394        }
395
396        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
397        'main_loop: loop {
398            match *inner_state {
399                MaterializeStreamState::NormalIngestion => {
400                    #[for_await]
401                    '_normal_ingest: for msg in input.by_ref() {
402                        let msg = msg?;
403                        self.materialize_cache.evict();
404
405                        match msg {
406                            Message::Watermark(w) => {
407                                yield Message::Watermark(w);
408                            }
409                            Message::Chunk(chunk) if self.is_dummy_table => {
410                                self.metrics
411                                    .materialize_input_row_count
412                                    .inc_by(chunk.cardinality() as u64);
413                                yield Message::Chunk(chunk);
414                            }
415                            Message::Chunk(chunk) => {
416                                self.metrics
417                                    .materialize_input_row_count
418                                    .inc_by(chunk.cardinality() as u64);
419                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
420                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
421                                // and the conflict behavior is overwrite.
422                                let do_not_handle_conflict = !self.state_table.is_consistent_op()
423                                    && self.version_column_indices.is_empty()
424                                    && self.conflict_behavior == ConflictBehavior::Overwrite;
425
426                                match self.conflict_behavior {
427                                    checked_conflict_behaviors!() if !do_not_handle_conflict => {
428                                        if chunk.cardinality() == 0 {
429                                            // empty chunk
430                                            continue;
431                                        }
432                                        let (data_chunk, ops) = chunk.clone().into_parts();
433
434                                        if self.state_table.value_indices().is_some() {
435                                            // TODO(st1page): when materialize partial columns(), we should
436                                            // construct some columns in the pk
437                                            panic!(
438                                                "materialize executor with data check can not handle only materialize partial columns"
439                                            )
440                                        };
441                                        let values = data_chunk.serialize();
442
443                                        let key_chunk =
444                                            data_chunk.project(self.state_table.pk_indices());
445
446                                        // For refreshable materialized views, write to staging table during refresh
447                                        // Do not use generate_output here.
448                                        if let Some(ref mut refresh_args) = self.refresh_args
449                                            && refresh_args.is_refreshing
450                                        {
451                                            let key_chunk = chunk
452                                                .clone()
453                                                .project(self.state_table.pk_indices());
454                                            tracing::trace!(
455                                                staging_chunk = %key_chunk.to_pretty(),
456                                                input_chunk = %chunk.to_pretty(),
457                                                "writing to staging table"
458                                            );
459                                            if cfg!(debug_assertions) {
460                                                // refreshable source should be append-only
461                                                assert!(
462                                                    key_chunk
463                                                        .ops()
464                                                        .iter()
465                                                        .all(|op| op == &Op::Insert)
466                                                );
467                                            }
468                                            refresh_args
469                                                .staging_table
470                                                .write_chunk(key_chunk.clone());
471                                            refresh_args.staging_table.try_flush().await?;
472                                        }
473
474                                        let pks = {
475                                            let mut pks = vec![vec![]; data_chunk.capacity()];
476                                            key_chunk
477                                                .rows_with_holes()
478                                                .zip_eq_fast(pks.iter_mut())
479                                                .for_each(|(r, vnode_and_pk)| {
480                                                    if let Some(r) = r {
481                                                        self.state_table
482                                                            .pk_serde()
483                                                            .serialize(r, vnode_and_pk);
484                                                    }
485                                                });
486                                            pks
487                                        };
488                                        let (_, vis) = key_chunk.into_parts();
489                                        let row_ops = ops
490                                            .iter()
491                                            .zip_eq_debug(pks.into_iter())
492                                            .zip_eq_debug(values.into_iter())
493                                            .zip_eq_debug(vis.iter())
494                                            .filter_map(|(((op, k), v), vis)| {
495                                                vis.then_some((*op, k, v))
496                                            })
497                                            .collect_vec();
498
499                                        let change_buffer = self
500                                            .materialize_cache
501                                            .handle(
502                                                row_ops,
503                                                &self.state_table,
504                                                self.conflict_behavior,
505                                                &self.metrics,
506                                                self.toastable_column_indices.as_deref(),
507                                            )
508                                            .await?;
509
510                                        match generate_output(change_buffer, data_types.clone())? {
511                                            Some(output_chunk) => {
512                                                self.state_table.write_chunk(output_chunk.clone());
513                                                self.state_table.try_flush().await?;
514                                                yield Message::Chunk(output_chunk);
515                                            }
516                                            None => continue,
517                                        }
518                                    }
519                                    ConflictBehavior::IgnoreConflict => unreachable!(),
520                                    ConflictBehavior::NoCheck
521                                    | ConflictBehavior::Overwrite
522                                    | ConflictBehavior::DoUpdateIfNotNull => {
523                                        self.state_table.write_chunk(chunk.clone());
524                                        self.state_table.try_flush().await?;
525
526                                        // For refreshable materialized views, also write to staging table during refresh
527                                        if let Some(ref mut refresh_args) = self.refresh_args
528                                            && refresh_args.is_refreshing
529                                        {
530                                            let key_chunk = chunk
531                                                .clone()
532                                                .project(self.state_table.pk_indices());
533                                            tracing::trace!(
534                                                staging_chunk = %key_chunk.to_pretty(),
535                                                input_chunk = %chunk.to_pretty(),
536                                                "writing to staging table"
537                                            );
538                                            if cfg!(debug_assertions) {
539                                                // refreshable source should be append-only
540                                                assert!(
541                                                    key_chunk
542                                                        .ops()
543                                                        .iter()
544                                                        .all(|op| op == &Op::Insert)
545                                                );
546                                            }
547                                            refresh_args
548                                                .staging_table
549                                                .write_chunk(key_chunk.clone());
550                                            refresh_args.staging_table.try_flush().await?;
551                                        }
552
553                                        yield Message::Chunk(chunk);
554                                    }
555                                }
556                            }
557                            Message::Barrier(barrier) => {
558                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
559                                    barrier,
560                                    expect_next_state: Box::new(
561                                        MaterializeStreamState::NormalIngestion,
562                                    ),
563                                };
564                                continue 'main_loop;
565                            }
566                        }
567                    }
568
569                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
570                        anyhow::anyhow!(
571                            "Input stream terminated unexpectedly during normal ingestion"
572                        ),
573                    )));
574                }
575                MaterializeStreamState::MergingData => {
576                    let Some(refresh_args) = self.refresh_args.as_mut() else {
577                        panic!(
578                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
579                        );
580                    };
581                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
582
583                    debug_assert_eq!(
584                        self.state_table.vnodes(),
585                        refresh_args.staging_table.vnodes()
586                    );
587                    debug_assert_eq!(
588                        refresh_args.staging_table.vnodes(),
589                        refresh_args.progress_table.vnodes()
590                    );
591
592                    let mut rows_to_delete = vec![];
593                    let mut merge_complete = false;
594                    let mut pending_barrier: Option<Barrier> = None;
595
596                    // Scope to limit immutable borrows to state tables
597                    {
598                        let left_input = input.by_ref().map(Either::Left);
599                        let right_merge_sort = pin!(
600                            Self::make_mergesort_stream(
601                                &self.state_table,
602                                &refresh_args.staging_table,
603                                &mut refresh_args.progress_table
604                            )
605                            .map(Either::Right)
606                        );
607
608                        // Prefer to select input stream to handle barriers promptly
609                        // Rebuild the merge stream each time processing a barrier
610                        let mut merge_stream =
611                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
612                                stream::PollNext::Left
613                            });
614
615                        #[for_await]
616                        'merge_stream: for either in &mut merge_stream {
617                            match either {
618                                Either::Left(msg) => {
619                                    let msg = msg?;
620                                    match msg {
621                                        Message::Watermark(w) => yield Message::Watermark(w),
622                                        Message::Chunk(chunk) => {
623                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
624                                        }
625                                        Message::Barrier(b) => {
626                                            pending_barrier = Some(b);
627                                            break 'merge_stream;
628                                        }
629                                    }
630                                }
631                                Either::Right(result) => {
632                                    match result? {
633                                        Some((_vnode, row)) => {
634                                            rows_to_delete.push(row);
635                                        }
636                                        None => {
637                                            // Merge stream finished
638                                            merge_complete = true;
639
640                                            // If the merge stream finished, we need to wait for the next barrier to commit states
641                                        }
642                                    }
643                                }
644                            }
645                        }
646                    }
647
648                    // Process collected rows for deletion
649                    for row in &rows_to_delete {
650                        self.state_table.delete(row);
651                    }
652                    if !rows_to_delete.is_empty() {
653                        let to_delete_chunk = StreamChunk::from_rows(
654                            &rows_to_delete
655                                .iter()
656                                .map(|row| (Op::Delete, row))
657                                .collect_vec(),
658                            &self.schema.data_types(),
659                        );
660                        yield Message::Chunk(to_delete_chunk);
661                    }
662
663                    // should wait for at least one barrier
664                    assert!(pending_barrier.is_some(), "pending barrier is not set");
665
666                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
667                        barrier: pending_barrier.unwrap(),
668                        expect_next_state: if merge_complete {
669                            Box::new(MaterializeStreamState::CleanUp)
670                        } else {
671                            Box::new(MaterializeStreamState::MergingData)
672                        },
673                    };
674                    continue 'main_loop;
675                }
676                MaterializeStreamState::CleanUp => {
677                    let Some(refresh_args) = self.refresh_args.as_mut() else {
678                        panic!(
679                            "MaterializeExecutor entered MergingData state without refresh_args configured"
680                        );
681                    };
682                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
683
684                    #[for_await]
685                    for msg in input.by_ref() {
686                        let msg = msg?;
687                        match msg {
688                            Message::Watermark(w) => yield Message::Watermark(w),
689                            Message::Chunk(chunk) => {
690                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
691                            }
692                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
693                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694                                    barrier,
695                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
696                                };
697                                continue 'main_loop;
698                            }
699                            Message::Barrier(barrier) => {
700                                let staging_table_id = refresh_args.staging_table.table_id();
701                                let epoch = barrier.epoch;
702                                self.local_barrier_manager.report_refresh_finished(
703                                    epoch,
704                                    self.actor_context.id,
705                                    refresh_args.table_id.into(),
706                                    staging_table_id,
707                                );
708                                tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
709
710                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
711                                    barrier,
712                                    expect_next_state: Box::new(
713                                        MaterializeStreamState::RefreshEnd {
714                                            on_complete_epoch: epoch,
715                                        },
716                                    ),
717                                };
718                                continue 'main_loop;
719                            }
720                        }
721                    }
722                }
723                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
724                    let Some(refresh_args) = self.refresh_args.as_mut() else {
725                        panic!(
726                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
727                        );
728                    };
729                    let staging_table_id = refresh_args.staging_table.table_id();
730
731                    // Wait for staging table truncation to complete
732                    let staging_store = refresh_args.staging_table.state_store().clone();
733                    staging_store
734                        .try_wait_epoch(
735                            HummockReadEpoch::Committed(on_complete_epoch.prev),
736                            TryWaitEpochOptions {
737                                table_id: staging_table_id.into(),
738                            },
739                        )
740                        .await?;
741
742                    if let Some(ref mut refresh_args) = self.refresh_args {
743                        refresh_args.is_refreshing = false;
744                    }
745                    *inner_state = MaterializeStreamState::NormalIngestion;
746                    continue 'main_loop;
747                }
748                MaterializeStreamState::CommitAndYieldBarrier {
749                    barrier,
750                    mut expect_next_state,
751                } => {
752                    if let Some(ref mut refresh_args) = self.refresh_args {
753                        match barrier.mutation.as_deref() {
754                            Some(Mutation::RefreshStart {
755                                table_id: refresh_table_id,
756                                associated_source_id: _,
757                            }) if *refresh_table_id == refresh_args.table_id => {
758                                debug_assert!(
759                                    !refresh_args.is_refreshing,
760                                    "cannot start refresh twice"
761                                );
762                                refresh_args.is_refreshing = true;
763                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
764
765                                // Initialize progress tracking for all VNodes
766                                Self::init_refresh_progress(
767                                    &self.state_table,
768                                    &mut refresh_args.progress_table,
769                                    barrier.epoch.curr,
770                                )?;
771                            }
772                            Some(Mutation::LoadFinish {
773                                associated_source_id: load_finish_source_id,
774                            }) => {
775                                // Get associated source id from table catalog
776                                let associated_source_id = match refresh_args
777                                    .table_catalog
778                                    .optional_associated_source_id
779                                {
780                                    Some(OptionalAssociatedSourceId::AssociatedSourceId(id)) => id,
781                                    None => unreachable!("associated_source_id is not set"),
782                                };
783
784                                if load_finish_source_id.table_id() == associated_source_id {
785                                    tracing::info!(
786                                        %load_finish_source_id,
787                                        "LoadFinish received, starting data replacement"
788                                    );
789                                    expect_next_state =
790                                        Box::new(MaterializeStreamState::<_>::MergingData);
791                                }
792                            }
793                            _ => {}
794                        }
795                    }
796
797                    // ===== normal operation =====
798
799                    // If a downstream mv depends on the current table, we need to do conflict check again.
800                    if !self.may_have_downstream
801                        && barrier.has_more_downstream_fragments(self.actor_context.id)
802                    {
803                        self.may_have_downstream = true;
804                    }
805                    Self::may_update_depended_subscriptions(
806                        &mut self.depended_subscription_ids,
807                        &barrier,
808                        mv_table_id,
809                    );
810                    let op_consistency_level = get_op_consistency_level(
811                        self.conflict_behavior,
812                        self.may_have_downstream,
813                        &self.depended_subscription_ids,
814                    );
815                    let post_commit = self
816                        .state_table
817                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
818                        .await?;
819                    if !post_commit.inner().is_consistent_op() {
820                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
821                    }
822
823                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
824
825                    // Commit staging table for refreshable materialized views
826                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
827                    {
828                        // Commit progress table for fault tolerance
829
830                        Some((
831                            refresh_args.staging_table.commit(barrier.epoch).await?,
832                            refresh_args.progress_table.commit(barrier.epoch).await?,
833                        ))
834                    } else {
835                        None
836                    };
837
838                    let b_epoch = barrier.epoch;
839                    yield Message::Barrier(barrier);
840
841                    // Update the vnode bitmap for the state table if asked.
842                    if let Some((_, cache_may_stale)) = post_commit
843                        .post_yield_barrier(update_vnode_bitmap.clone())
844                        .await?
845                        && cache_may_stale
846                    {
847                        self.materialize_cache.lru_cache.clear();
848                    }
849
850                    // Handle staging table post commit
851                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
852                        staging_post_commit
853                            .post_yield_barrier(update_vnode_bitmap.clone())
854                            .await?;
855                        progress_post_commit
856                            .post_yield_barrier(update_vnode_bitmap)
857                            .await?;
858                    }
859
860                    self.metrics
861                        .materialize_current_epoch
862                        .set(b_epoch.curr as i64);
863
864                    // ====== transition to next state ======
865
866                    *inner_state = *expect_next_state;
867                }
868            }
869        }
870    }
871
872    /// Stream that yields rows to be deleted from main table.
873    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
874    /// Yields `None` when finished processing all vnodes.
875    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
876    async fn make_mergesort_stream<'a>(
877        main_table: &'a StateTableInner<S, SD>,
878        staging_table: &'a StateTableInner<S, SD>,
879        progress_table: &'a mut RefreshProgressTable<S>,
880    ) {
881        for vnode in main_table.vnodes().clone().iter_vnodes() {
882            let mut processed_rows = 0;
883            // Check if this VNode has already been completed (for fault tolerance)
884            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
885                if let Some(current_entry) = progress_table.get_progress(vnode) {
886                    // Skip already completed VNodes during recovery
887                    if current_entry.is_completed {
888                        tracing::debug!(
889                            vnode = vnode.to_index(),
890                            "Skipping already completed VNode during recovery"
891                        );
892                        continue;
893                    }
894                    processed_rows += current_entry.processed_rows;
895                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
896
897                    if let Some(current_state) = &current_entry.current_pos {
898                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
899                    } else {
900                        (Bound::Unbounded, Bound::Unbounded)
901                    }
902                } else {
903                    (Bound::Unbounded, Bound::Unbounded)
904                };
905
906            let iter_main = main_table
907                .iter_keyed_row_with_vnode(
908                    vnode,
909                    &pk_range,
910                    PrefetchOptions::prefetch_for_large_range_scan(),
911                )
912                .await?;
913            let iter_staging = staging_table
914                .iter_keyed_row_with_vnode(
915                    vnode,
916                    &pk_range,
917                    PrefetchOptions::prefetch_for_large_range_scan(),
918                )
919                .await?;
920
921            pin_mut!(iter_main);
922            pin_mut!(iter_staging);
923
924            // Sort-merge join implementation using dual pointers
925            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
926            let mut staging_item: Option<KeyedRow<Bytes>> =
927                iter_staging.next().await.transpose()?;
928
929            while let Some(main_kv) = main_item {
930                let main_key = main_kv.key();
931
932                // Advance staging iterator until we find a key >= main_key
933                let mut should_delete = false;
934                while let Some(staging_kv) = &staging_item {
935                    let staging_key = staging_kv.key();
936                    match main_key.cmp(staging_key) {
937                        std::cmp::Ordering::Greater => {
938                            // main_key > staging_key, advance staging
939                            staging_item = iter_staging.next().await.transpose()?;
940                        }
941                        std::cmp::Ordering::Equal => {
942                            // Keys match, this row exists in both tables, no need to delete
943                            break;
944                        }
945                        std::cmp::Ordering::Less => {
946                            // main_key < staging_key, main row doesn't exist in staging, delete it
947                            should_delete = true;
948                            break;
949                        }
950                    }
951                }
952
953                // If staging_item is None, all remaining main rows should be deleted
954                if staging_item.is_none() {
955                    should_delete = true;
956                }
957
958                if should_delete {
959                    yield Some((vnode, main_kv.row().clone()));
960                }
961
962                // Advance main iterator
963                processed_rows += 1;
964                tracing::info!(
965                    "set progress table: vnode = {:?}, processed_rows = {:?}",
966                    vnode,
967                    processed_rows
968                );
969                progress_table.set_progress(
970                    vnode,
971                    Some(
972                        main_kv
973                            .row()
974                            .project(main_table.pk_indices())
975                            .to_owned_row(),
976                    ),
977                    false,
978                    processed_rows,
979                )?;
980                main_item = iter_main.next().await.transpose()?;
981            }
982
983            // Mark this VNode as completed
984            if let Some(current_entry) = progress_table.get_progress(vnode) {
985                progress_table.set_progress(
986                    vnode,
987                    current_entry.current_pos.clone(),
988                    true, // completed
989                    current_entry.processed_rows,
990                )?;
991
992                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
993            }
994        }
995
996        // Signal completion
997        yield None;
998    }
999
1000    /// return true when changed
1001    fn may_update_depended_subscriptions(
1002        depended_subscriptions: &mut HashSet<u32>,
1003        barrier: &Barrier,
1004        mv_table_id: TableId,
1005    ) {
1006        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1007            if !depended_subscriptions.insert(subscriber_id) {
1008                warn!(
1009                    ?depended_subscriptions,
1010                    ?mv_table_id,
1011                    subscriber_id,
1012                    "subscription id already exists"
1013                );
1014            }
1015        }
1016
1017        if let Some(Mutation::DropSubscriptions {
1018            subscriptions_to_drop,
1019        }) = barrier.mutation.as_deref()
1020        {
1021            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1022                if *upstream_mv_table_id == mv_table_id
1023                    && !depended_subscriptions.remove(subscriber_id)
1024                {
1025                    warn!(
1026                        ?depended_subscriptions,
1027                        ?mv_table_id,
1028                        subscriber_id,
1029                        "drop non existing subscriber_id id"
1030                    );
1031                }
1032            }
1033        }
1034    }
1035
1036    /// Initialize refresh progress tracking for all `VNodes`
1037    fn init_refresh_progress(
1038        state_table: &StateTableInner<S, SD>,
1039        progress_table: &mut RefreshProgressTable<S>,
1040        _epoch: u64,
1041    ) -> StreamExecutorResult<()> {
1042        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1043
1044        // Initialize progress for all VNodes in the current bitmap
1045        for vnode in state_table.vnodes().iter_vnodes() {
1046            progress_table.set_progress(
1047                vnode, None,  // initial position
1048                false, // not completed yet
1049                0,     // initial processed rows
1050            )?;
1051        }
1052
1053        tracing::info!(
1054            vnodes_count = state_table.vnodes().count_ones(),
1055            "Initialized refresh progress tracking for all VNodes"
1056        );
1057
1058        Ok(())
1059    }
1060}
1061
1062impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1063    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1064    #[cfg(any(test, feature = "test"))]
1065    pub async fn for_test(
1066        input: Executor,
1067        store: S,
1068        table_id: TableId,
1069        keys: Vec<ColumnOrder>,
1070        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1071        watermark_epoch: AtomicU64Ref,
1072        conflict_behavior: ConflictBehavior,
1073    ) -> Self {
1074        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1075        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1076        let schema = input.schema().clone();
1077        let columns: Vec<ColumnDesc> = column_ids
1078            .into_iter()
1079            .zip_eq_fast(schema.fields.iter())
1080            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1081            .collect_vec();
1082
1083        let row_serde = BasicSerde::new(
1084            Arc::from((0..columns.len()).collect_vec()),
1085            Arc::from(columns.clone().into_boxed_slice()),
1086        );
1087        let state_table = StateTableInner::from_table_catalog(
1088            &crate::common::table::test_utils::gen_pbtable(
1089                table_id,
1090                columns,
1091                arrange_order_types,
1092                arrange_columns.clone(),
1093                0,
1094            ),
1095            store,
1096            None,
1097        )
1098        .await;
1099
1100        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
1101
1102        Self {
1103            input,
1104            schema,
1105            state_table,
1106            arrange_key_indices: arrange_columns.clone(),
1107            actor_context: ActorContext::for_test(0),
1108            materialize_cache: MaterializeCache::new(
1109                watermark_epoch,
1110                MetricsInfo::for_test(),
1111                row_serde,
1112                vec![],
1113            ),
1114            conflict_behavior,
1115            version_column_indices: vec![],
1116            is_dummy_table: false,
1117            toastable_column_indices: None,
1118            may_have_downstream: true,
1119            depended_subscription_ids: HashSet::new(),
1120            metrics,
1121            refresh_args: None, // Test constructor doesn't support refresh functionality
1122            local_barrier_manager: LocalBarrierManager::for_test(),
1123        }
1124    }
1125}
1126
1127/// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
1128/// Optimized by checking length first to avoid expensive string comparison.
1129fn is_unavailable_value_str(s: &str) -> bool {
1130    s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1131}
1132
1133/// Check if a datum represents Debezium's unavailable value placeholder.
1134/// This function handles both scalar types and one-dimensional arrays.
1135fn is_debezium_unavailable_value(
1136    datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1137) -> bool {
1138    match datum {
1139        Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1140        Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1141            // For jsonb type, check if it's a string containing the unavailable value
1142            jsonb_ref
1143                .as_str()
1144                .map(is_unavailable_value_str)
1145                .unwrap_or(false)
1146        }
1147        Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1148            // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
1149            // This is because when processing bytea from Debezium, we convert the base64-encoded string
1150            // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
1151            if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1152                is_unavailable_value_str(bytea_str)
1153            } else {
1154                false
1155            }
1156        }
1157        Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1158            // For list type, check if it contains exactly one element with the unavailable value
1159            // This is because when any element in an array triggers TOAST, Debezium treats the entire
1160            // array as unchanged and sends a placeholder array with only one element
1161            if list_ref.len() == 1 {
1162                if let Some(Some(element)) = list_ref.get(0) {
1163                    // Recursively check the array element
1164                    is_debezium_unavailable_value(&Some(element))
1165                } else {
1166                    false
1167                }
1168            } else {
1169                false
1170            }
1171        }
1172        _ => false,
1173    }
1174}
1175
1176/// Fix TOAST columns by replacing unavailable values with old row values.
1177fn handle_toast_columns_for_postgres_cdc(
1178    old_row: &OwnedRow,
1179    new_row: &OwnedRow,
1180    toastable_indices: &[usize],
1181) -> OwnedRow {
1182    let mut fixed_row_data = new_row.as_inner().to_vec();
1183
1184    for &toast_idx in toastable_indices {
1185        // Check if the new value is Debezium's unavailable value placeholder
1186        let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1187        if is_unavailable {
1188            // Replace with old row value if available
1189            if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1190                fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1191            }
1192        }
1193    }
1194
1195    OwnedRow::new(fixed_row_data)
1196}
1197
1198/// Construct output `StreamChunk` from given buffer.
1199fn generate_output(
1200    change_buffer: ChangeBuffer,
1201    data_types: Vec<DataType>,
1202) -> StreamExecutorResult<Option<StreamChunk>> {
1203    // construct output chunk
1204    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
1205    let mut new_ops: Vec<Op> = vec![];
1206    let mut new_rows: Vec<OwnedRow> = vec![];
1207    for (_, row_op) in change_buffer.into_parts() {
1208        match row_op {
1209            ChangeBufferKeyOp::Insert(value) => {
1210                new_ops.push(Op::Insert);
1211                new_rows.push(value);
1212            }
1213            ChangeBufferKeyOp::Delete(old_value) => {
1214                new_ops.push(Op::Delete);
1215                new_rows.push(old_value);
1216            }
1217            ChangeBufferKeyOp::Update((old_value, new_value)) => {
1218                // if old_value == new_value, we don't need to emit updates to downstream.
1219                if old_value != new_value {
1220                    new_ops.push(Op::UpdateDelete);
1221                    new_ops.push(Op::UpdateInsert);
1222                    new_rows.push(old_value);
1223                    new_rows.push(new_value);
1224                }
1225            }
1226        }
1227    }
1228    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
1229
1230    for row in new_rows {
1231        let res = data_chunk_builder.append_one_row(row);
1232        debug_assert!(res.is_none());
1233    }
1234
1235    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
1236        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
1237        Ok(Some(new_stream_chunk))
1238    } else {
1239        Ok(None)
1240    }
1241}
1242
1243/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
1244/// TODO(rc): merge with `TopNStaging`.
1245pub struct ChangeBuffer {
1246    buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
1247}
1248
1249/// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes
1250enum ChangeBufferKeyOp {
1251    Insert(OwnedRow),
1252    Delete(OwnedRow),
1253    /// (`old_value`, `new_value`)
1254    Update((OwnedRow, OwnedRow)),
1255}
1256
1257impl ChangeBuffer {
1258    fn new() -> Self {
1259        Self {
1260            buffer: HashMap::new(),
1261        }
1262    }
1263
1264    fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
1265        let entry = self.buffer.entry(pk);
1266        match entry {
1267            Entry::Vacant(e) => {
1268                e.insert(ChangeBufferKeyOp::Insert(value));
1269            }
1270            Entry::Occupied(mut e) => {
1271                if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
1272                    let old_val = std::mem::take(old_value);
1273                    e.insert(ChangeBufferKeyOp::Update((old_val, value)));
1274                } else {
1275                    unreachable!();
1276                }
1277            }
1278        }
1279    }
1280
1281    fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
1282        let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
1283        match entry {
1284            Entry::Vacant(e) => {
1285                e.insert(ChangeBufferKeyOp::Delete(old_value));
1286            }
1287            Entry::Occupied(mut e) => match e.get_mut() {
1288                ChangeBufferKeyOp::Insert(_) => {
1289                    e.remove();
1290                }
1291                ChangeBufferKeyOp::Update((prev, _curr)) => {
1292                    let prev = std::mem::take(prev);
1293                    e.insert(ChangeBufferKeyOp::Delete(prev));
1294                }
1295                ChangeBufferKeyOp::Delete(_) => {
1296                    unreachable!();
1297                }
1298            },
1299        }
1300    }
1301
1302    fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
1303        let entry = self.buffer.entry(pk);
1304        match entry {
1305            Entry::Vacant(e) => {
1306                e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
1307            }
1308            Entry::Occupied(mut e) => match e.get_mut() {
1309                ChangeBufferKeyOp::Insert(_) => {
1310                    e.insert(ChangeBufferKeyOp::Insert(new_value));
1311                }
1312                ChangeBufferKeyOp::Update((_prev, curr)) => {
1313                    *curr = new_value;
1314                }
1315                ChangeBufferKeyOp::Delete(_) => {
1316                    unreachable!()
1317                }
1318            },
1319        }
1320    }
1321
1322    fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
1323        self.buffer
1324    }
1325}
1326impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1327    fn execute(self: Box<Self>) -> BoxedMessageStream {
1328        self.execute_inner().boxed()
1329    }
1330}
1331
1332impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334        f.debug_struct("MaterializeExecutor")
1335            .field("arrange_key_indices", &self.arrange_key_indices)
1336            .finish()
1337    }
1338}
1339
1340/// A cache for materialize executors.
1341struct MaterializeCache<SD> {
1342    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1343    row_serde: BasicSerde,
1344    version_column_indices: Vec<u32>,
1345    _serde: PhantomData<SD>,
1346}
1347
1348type CacheValue = Option<CompactedRow>;
1349
1350impl<SD: ValueRowSerde> MaterializeCache<SD> {
1351    fn new(
1352        watermark_sequence: AtomicU64Ref,
1353        metrics_info: MetricsInfo,
1354        row_serde: BasicSerde,
1355        version_column_indices: Vec<u32>,
1356    ) -> Self {
1357        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1358            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
1359        Self {
1360            lru_cache,
1361            row_serde,
1362            version_column_indices,
1363            _serde: PhantomData,
1364        }
1365    }
1366
1367    /// First populate the cache from `table`, and then calculate a [`ChangeBuffer`].
1368    /// `table` will not be written in this method.
1369    async fn handle<S: StateStore>(
1370        &mut self,
1371        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1372        table: &StateTableInner<S, SD>,
1373        conflict_behavior: ConflictBehavior,
1374        metrics: &MaterializeMetrics,
1375        toastable_column_indices: Option<&[usize]>,
1376    ) -> StreamExecutorResult<ChangeBuffer> {
1377        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1378
1379        let key_set: HashSet<Box<[u8]>> = row_ops
1380            .iter()
1381            .map(|(_, k, _)| k.as_slice().into())
1382            .collect();
1383
1384        // Populate the LRU cache with the keys in input chunk.
1385        // For new keys, row values are set to None.
1386        self.fetch_keys(
1387            key_set.iter().map(|v| v.deref()),
1388            table,
1389            conflict_behavior,
1390            metrics,
1391        )
1392        .await?;
1393
1394        let mut change_buffer = ChangeBuffer::new();
1395        let row_serde = self.row_serde.clone();
1396        let version_column_indices = self.version_column_indices.clone();
1397        for (op, key, row) in row_ops {
1398            match op {
1399                Op::Insert | Op::UpdateInsert => {
1400                    let Some(old_row) = self.get_expected(&key) else {
1401                        // not exists before, meaning no conflict, simply insert
1402                        let new_row_deserialized =
1403                            row_serde.deserializer.deserialize(row.clone())?;
1404                        change_buffer.insert(key.clone(), new_row_deserialized);
1405                        self.lru_cache.put(key, Some(CompactedRow { row }));
1406                        continue;
1407                    };
1408
1409                    // now conflict happens, handle it according to the specified behavior
1410                    match conflict_behavior {
1411                        ConflictBehavior::Overwrite => {
1412                            let old_row_deserialized =
1413                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1414                            let new_row_deserialized =
1415                                row_serde.deserializer.deserialize(row.clone())?;
1416
1417                            let need_overwrite = if !version_column_indices.is_empty() {
1418                                versions_are_newer_or_equal(
1419                                    &old_row_deserialized,
1420                                    &new_row_deserialized,
1421                                    &version_column_indices,
1422                                )
1423                            } else {
1424                                // no version column specified, just overwrite
1425                                true
1426                            };
1427
1428                            if need_overwrite {
1429                                if let Some(toastable_indices) = toastable_column_indices {
1430                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
1431                                    let final_row = handle_toast_columns_for_postgres_cdc(
1432                                        &old_row_deserialized,
1433                                        &new_row_deserialized,
1434                                        toastable_indices,
1435                                    );
1436
1437                                    change_buffer.update(
1438                                        key.clone(),
1439                                        old_row_deserialized,
1440                                        final_row.clone(),
1441                                    );
1442                                    let final_row_bytes =
1443                                        Bytes::from(row_serde.serializer.serialize(final_row));
1444                                    self.lru_cache.put(
1445                                        key.clone(),
1446                                        Some(CompactedRow {
1447                                            row: final_row_bytes,
1448                                        }),
1449                                    );
1450                                } else {
1451                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
1452                                    change_buffer.update(
1453                                        key.clone(),
1454                                        old_row_deserialized,
1455                                        new_row_deserialized,
1456                                    );
1457                                    self.lru_cache
1458                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1459                                }
1460                            };
1461                        }
1462                        ConflictBehavior::IgnoreConflict => {
1463                            // ignore conflict, do nothing
1464                        }
1465                        ConflictBehavior::DoUpdateIfNotNull => {
1466                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
1467
1468                            let old_row_deserialized =
1469                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1470                            let new_row_deserialized =
1471                                row_serde.deserializer.deserialize(row.clone())?;
1472                            let need_overwrite = if !version_column_indices.is_empty() {
1473                                versions_are_newer_or_equal(
1474                                    &old_row_deserialized,
1475                                    &new_row_deserialized,
1476                                    &version_column_indices,
1477                                )
1478                            } else {
1479                                true
1480                            };
1481
1482                            if need_overwrite {
1483                                let mut row_deserialized_vec =
1484                                    old_row_deserialized.clone().into_inner().into_vec();
1485                                replace_if_not_null(
1486                                    &mut row_deserialized_vec,
1487                                    new_row_deserialized.clone(),
1488                                );
1489                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
1490
1491                                // Apply TOAST column fix for CDC tables with TOAST columns
1492                                if let Some(toastable_indices) = toastable_column_indices {
1493                                    // Note: we need to use old_row_deserialized again, but it was moved above
1494                                    // So we re-deserialize the old row
1495                                    let old_row_deserialized_again =
1496                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
1497                                    updated_row = handle_toast_columns_for_postgres_cdc(
1498                                        &old_row_deserialized_again,
1499                                        &updated_row,
1500                                        toastable_indices,
1501                                    );
1502                                }
1503
1504                                change_buffer.update(
1505                                    key.clone(),
1506                                    old_row_deserialized,
1507                                    updated_row.clone(),
1508                                );
1509                                let updated_row_bytes =
1510                                    Bytes::from(row_serde.serializer.serialize(updated_row));
1511                                self.lru_cache.put(
1512                                    key.clone(),
1513                                    Some(CompactedRow {
1514                                        row: updated_row_bytes,
1515                                    }),
1516                                );
1517                            }
1518                        }
1519                        _ => unreachable!(),
1520                    };
1521                }
1522
1523                Op::Delete | Op::UpdateDelete => {
1524                    match conflict_behavior {
1525                        checked_conflict_behaviors!() => {
1526                            if let Some(old_row) = self.get_expected(&key) {
1527                                let old_row_deserialized =
1528                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
1529                                change_buffer.delete(key.clone(), old_row_deserialized);
1530                                // put a None into the cache to represent deletion
1531                                self.lru_cache.put(key, None);
1532                            } else {
1533                                // delete a non-existent value
1534                                // this is allowed in the case of mview conflict, so ignore
1535                            };
1536                        }
1537                        _ => unreachable!(),
1538                    };
1539                }
1540            }
1541        }
1542        Ok(change_buffer)
1543    }
1544
1545    async fn fetch_keys<'a, S: StateStore>(
1546        &mut self,
1547        keys: impl Iterator<Item = &'a [u8]>,
1548        table: &StateTableInner<S, SD>,
1549        conflict_behavior: ConflictBehavior,
1550        metrics: &MaterializeMetrics,
1551    ) -> StreamExecutorResult<()> {
1552        let mut futures = vec![];
1553        for key in keys {
1554            metrics.materialize_cache_total_count.inc();
1555
1556            if self.lru_cache.contains(key) {
1557                if self.lru_cache.get(key).unwrap().is_some() {
1558                    metrics.materialize_data_exist_count.inc();
1559                }
1560                metrics.materialize_cache_hit_count.inc();
1561                continue;
1562            }
1563            futures.push(async {
1564                let key_row = table.pk_serde().deserialize(key).unwrap();
1565                let row = table.get_row(key_row).await?.map(CompactedRow::from);
1566                StreamExecutorResult::Ok((key.to_vec(), row))
1567            });
1568        }
1569
1570        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1571        while let Some(result) = buffered.next().await {
1572            let (key, row) = result?;
1573            if row.is_some() {
1574                metrics.materialize_data_exist_count.inc();
1575            }
1576            // for keys that are not in the table, `value` is None
1577            match conflict_behavior {
1578                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1579                _ => unreachable!(),
1580            };
1581        }
1582
1583        Ok(())
1584    }
1585
1586    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1587        self.lru_cache.get(key).unwrap_or_else(|| {
1588            panic!(
1589                "the key {:?} has not been fetched in the materialize executor's cache ",
1590                key
1591            )
1592        })
1593    }
1594
1595    fn evict(&mut self) {
1596        self.lru_cache.evict()
1597    }
1598}
1599
1600/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
1601/// column value in the replacement row is not null.
1602///
1603/// # Example
1604///
1605/// ```ignore
1606/// let mut row = vec![Some(1), None, Some(3)];
1607/// let replacement = vec![Some(10), Some(20), None];
1608/// replace_if_not_null(&mut row, replacement);
1609/// ```
1610///
1611/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
1612fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1613    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1614        if let Some(new_value) = new_col {
1615            *old_col = Some(new_value);
1616        }
1617    }
1618}
1619
1620/// Compare multiple version columns lexicographically.
1621/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
1622fn versions_are_newer_or_equal(
1623    old_row: &OwnedRow,
1624    new_row: &OwnedRow,
1625    version_column_indices: &[u32],
1626) -> bool {
1627    if version_column_indices.is_empty() {
1628        // No version columns specified, always consider new version as newer
1629        return true;
1630    }
1631
1632    for &idx in version_column_indices {
1633        let old_value = old_row.index(idx as usize);
1634        let new_value = new_row.index(idx as usize);
1635
1636        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1637            std::cmp::Ordering::Less => return true,     // new is newer
1638            std::cmp::Ordering::Greater => return false, // old is newer
1639            std::cmp::Ordering::Equal => continue,       // equal, check next column
1640        }
1641    }
1642
1643    // All version columns are equal, consider new version as equal (should overwrite)
1644    true
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649
1650    use std::iter;
1651    use std::sync::atomic::AtomicU64;
1652
1653    use rand::rngs::SmallRng;
1654    use rand::{Rng, RngCore, SeedableRng};
1655    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1656    use risingwave_common::catalog::Field;
1657    use risingwave_common::util::epoch::test_epoch;
1658    use risingwave_common::util::sort_util::OrderType;
1659    use risingwave_hummock_sdk::HummockReadEpoch;
1660    use risingwave_storage::memory::MemoryStateStore;
1661    use risingwave_storage::table::batch_table::BatchTable;
1662
1663    use super::*;
1664    use crate::executor::test_utils::*;
1665
1666    #[tokio::test]
1667    async fn test_materialize_executor() {
1668        // Prepare storage and memtable.
1669        let memory_state_store = MemoryStateStore::new();
1670        let table_id = TableId::new(1);
1671        // Two columns of int32 type, the first column is PK.
1672        let schema = Schema::new(vec![
1673            Field::unnamed(DataType::Int32),
1674            Field::unnamed(DataType::Int32),
1675        ]);
1676        let column_ids = vec![0.into(), 1.into()];
1677
1678        // Prepare source chunks.
1679        let chunk1 = StreamChunk::from_pretty(
1680            " i i
1681            + 1 4
1682            + 2 5
1683            + 3 6",
1684        );
1685        let chunk2 = StreamChunk::from_pretty(
1686            " i i
1687            + 7 8
1688            - 3 6",
1689        );
1690
1691        // Prepare stream executors.
1692        let source = MockSource::with_messages(vec![
1693            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1694            Message::Chunk(chunk1),
1695            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1696            Message::Chunk(chunk2),
1697            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1698        ])
1699        .into_executor(schema.clone(), PkIndices::new());
1700
1701        let order_types = vec![OrderType::ascending()];
1702        let column_descs = vec![
1703            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1704            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1705        ];
1706
1707        let table = BatchTable::for_test(
1708            memory_state_store.clone(),
1709            table_id,
1710            column_descs,
1711            order_types,
1712            vec![0],
1713            vec![0, 1],
1714        );
1715
1716        let mut materialize_executor = MaterializeExecutor::for_test(
1717            source,
1718            memory_state_store,
1719            table_id,
1720            vec![ColumnOrder::new(0, OrderType::ascending())],
1721            column_ids,
1722            Arc::new(AtomicU64::new(0)),
1723            ConflictBehavior::NoCheck,
1724        )
1725        .await
1726        .boxed()
1727        .execute();
1728        materialize_executor.next().await.transpose().unwrap();
1729
1730        materialize_executor.next().await.transpose().unwrap();
1731
1732        // First stream chunk. We check the existence of (3) -> (3,6)
1733        match materialize_executor.next().await.transpose().unwrap() {
1734            Some(Message::Barrier(_)) => {
1735                let row = table
1736                    .get_row(
1737                        &OwnedRow::new(vec![Some(3_i32.into())]),
1738                        HummockReadEpoch::NoWait(u64::MAX),
1739                    )
1740                    .await
1741                    .unwrap();
1742                assert_eq!(
1743                    row,
1744                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1745                );
1746            }
1747            _ => unreachable!(),
1748        }
1749        materialize_executor.next().await.transpose().unwrap();
1750        // Second stream chunk. We check the existence of (7) -> (7,8)
1751        match materialize_executor.next().await.transpose().unwrap() {
1752            Some(Message::Barrier(_)) => {
1753                let row = table
1754                    .get_row(
1755                        &OwnedRow::new(vec![Some(7_i32.into())]),
1756                        HummockReadEpoch::NoWait(u64::MAX),
1757                    )
1758                    .await
1759                    .unwrap();
1760                assert_eq!(
1761                    row,
1762                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1763                );
1764            }
1765            _ => unreachable!(),
1766        }
1767    }
1768
1769    // https://github.com/risingwavelabs/risingwave/issues/13346
1770    #[tokio::test]
1771    async fn test_upsert_stream() {
1772        // Prepare storage and memtable.
1773        let memory_state_store = MemoryStateStore::new();
1774        let table_id = TableId::new(1);
1775        // Two columns of int32 type, the first column is PK.
1776        let schema = Schema::new(vec![
1777            Field::unnamed(DataType::Int32),
1778            Field::unnamed(DataType::Int32),
1779        ]);
1780        let column_ids = vec![0.into(), 1.into()];
1781
1782        // test double insert one pk, the latter needs to override the former.
1783        let chunk1 = StreamChunk::from_pretty(
1784            " i i
1785            + 1 1",
1786        );
1787
1788        let chunk2 = StreamChunk::from_pretty(
1789            " i i
1790            + 1 2
1791            - 1 2",
1792        );
1793
1794        // Prepare stream executors.
1795        let source = MockSource::with_messages(vec![
1796            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1797            Message::Chunk(chunk1),
1798            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1799            Message::Chunk(chunk2),
1800            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1801        ])
1802        .into_executor(schema.clone(), PkIndices::new());
1803
1804        let order_types = vec![OrderType::ascending()];
1805        let column_descs = vec![
1806            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1807            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1808        ];
1809
1810        let table = BatchTable::for_test(
1811            memory_state_store.clone(),
1812            table_id,
1813            column_descs,
1814            order_types,
1815            vec![0],
1816            vec![0, 1],
1817        );
1818
1819        let mut materialize_executor = MaterializeExecutor::for_test(
1820            source,
1821            memory_state_store,
1822            table_id,
1823            vec![ColumnOrder::new(0, OrderType::ascending())],
1824            column_ids,
1825            Arc::new(AtomicU64::new(0)),
1826            ConflictBehavior::Overwrite,
1827        )
1828        .await
1829        .boxed()
1830        .execute();
1831        materialize_executor.next().await.transpose().unwrap();
1832
1833        materialize_executor.next().await.transpose().unwrap();
1834        materialize_executor.next().await.transpose().unwrap();
1835        materialize_executor.next().await.transpose().unwrap();
1836
1837        match materialize_executor.next().await.transpose().unwrap() {
1838            Some(Message::Barrier(_)) => {
1839                let row = table
1840                    .get_row(
1841                        &OwnedRow::new(vec![Some(1_i32.into())]),
1842                        HummockReadEpoch::NoWait(u64::MAX),
1843                    )
1844                    .await
1845                    .unwrap();
1846                assert!(row.is_none());
1847            }
1848            _ => unreachable!(),
1849        }
1850    }
1851
1852    #[tokio::test]
1853    async fn test_check_insert_conflict() {
1854        // Prepare storage and memtable.
1855        let memory_state_store = MemoryStateStore::new();
1856        let table_id = TableId::new(1);
1857        // Two columns of int32 type, the first column is PK.
1858        let schema = Schema::new(vec![
1859            Field::unnamed(DataType::Int32),
1860            Field::unnamed(DataType::Int32),
1861        ]);
1862        let column_ids = vec![0.into(), 1.into()];
1863
1864        // test double insert one pk, the latter needs to override the former.
1865        let chunk1 = StreamChunk::from_pretty(
1866            " i i
1867            + 1 3
1868            + 1 4
1869            + 2 5
1870            + 3 6",
1871        );
1872
1873        let chunk2 = StreamChunk::from_pretty(
1874            " i i
1875            + 1 3
1876            + 2 6",
1877        );
1878
1879        // test delete wrong value, delete inexistent pk
1880        let chunk3 = StreamChunk::from_pretty(
1881            " i i
1882            + 1 4",
1883        );
1884
1885        // Prepare stream executors.
1886        let source = MockSource::with_messages(vec![
1887            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1888            Message::Chunk(chunk1),
1889            Message::Chunk(chunk2),
1890            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1891            Message::Chunk(chunk3),
1892            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1893        ])
1894        .into_executor(schema.clone(), PkIndices::new());
1895
1896        let order_types = vec![OrderType::ascending()];
1897        let column_descs = vec![
1898            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1899            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1900        ];
1901
1902        let table = BatchTable::for_test(
1903            memory_state_store.clone(),
1904            table_id,
1905            column_descs,
1906            order_types,
1907            vec![0],
1908            vec![0, 1],
1909        );
1910
1911        let mut materialize_executor = MaterializeExecutor::for_test(
1912            source,
1913            memory_state_store,
1914            table_id,
1915            vec![ColumnOrder::new(0, OrderType::ascending())],
1916            column_ids,
1917            Arc::new(AtomicU64::new(0)),
1918            ConflictBehavior::Overwrite,
1919        )
1920        .await
1921        .boxed()
1922        .execute();
1923        materialize_executor.next().await.transpose().unwrap();
1924
1925        materialize_executor.next().await.transpose().unwrap();
1926        materialize_executor.next().await.transpose().unwrap();
1927
1928        // First stream chunk. We check the existence of (3) -> (3,6)
1929        match materialize_executor.next().await.transpose().unwrap() {
1930            Some(Message::Barrier(_)) => {
1931                let row = table
1932                    .get_row(
1933                        &OwnedRow::new(vec![Some(3_i32.into())]),
1934                        HummockReadEpoch::NoWait(u64::MAX),
1935                    )
1936                    .await
1937                    .unwrap();
1938                assert_eq!(
1939                    row,
1940                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1941                );
1942
1943                let row = table
1944                    .get_row(
1945                        &OwnedRow::new(vec![Some(1_i32.into())]),
1946                        HummockReadEpoch::NoWait(u64::MAX),
1947                    )
1948                    .await
1949                    .unwrap();
1950                assert_eq!(
1951                    row,
1952                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1953                );
1954
1955                let row = table
1956                    .get_row(
1957                        &OwnedRow::new(vec![Some(2_i32.into())]),
1958                        HummockReadEpoch::NoWait(u64::MAX),
1959                    )
1960                    .await
1961                    .unwrap();
1962                assert_eq!(
1963                    row,
1964                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1965                );
1966            }
1967            _ => unreachable!(),
1968        }
1969    }
1970
1971    #[tokio::test]
1972    async fn test_delete_and_update_conflict() {
1973        // Prepare storage and memtable.
1974        let memory_state_store = MemoryStateStore::new();
1975        let table_id = TableId::new(1);
1976        // Two columns of int32 type, the first column is PK.
1977        let schema = Schema::new(vec![
1978            Field::unnamed(DataType::Int32),
1979            Field::unnamed(DataType::Int32),
1980        ]);
1981        let column_ids = vec![0.into(), 1.into()];
1982
1983        // test double insert one pk, the latter needs to override the former.
1984        let chunk1 = StreamChunk::from_pretty(
1985            " i i
1986            + 1 4
1987            + 2 5
1988            + 3 6
1989            U- 8 1
1990            U+ 8 2
1991            + 8 3",
1992        );
1993
1994        // test delete wrong value, delete inexistent pk
1995        let chunk2 = StreamChunk::from_pretty(
1996            " i i
1997            + 7 8
1998            - 3 4
1999            - 5 0",
2000        );
2001
2002        // test delete wrong value, delete inexistent pk
2003        let chunk3 = StreamChunk::from_pretty(
2004            " i i
2005            + 1 5
2006            U- 2 4
2007            U+ 2 8
2008            U- 9 0
2009            U+ 9 1",
2010        );
2011
2012        // Prepare stream executors.
2013        let source = MockSource::with_messages(vec![
2014            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2015            Message::Chunk(chunk1),
2016            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2017            Message::Chunk(chunk2),
2018            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2019            Message::Chunk(chunk3),
2020            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2021        ])
2022        .into_executor(schema.clone(), PkIndices::new());
2023
2024        let order_types = vec![OrderType::ascending()];
2025        let column_descs = vec![
2026            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2027            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2028        ];
2029
2030        let table = BatchTable::for_test(
2031            memory_state_store.clone(),
2032            table_id,
2033            column_descs,
2034            order_types,
2035            vec![0],
2036            vec![0, 1],
2037        );
2038
2039        let mut materialize_executor = MaterializeExecutor::for_test(
2040            source,
2041            memory_state_store,
2042            table_id,
2043            vec![ColumnOrder::new(0, OrderType::ascending())],
2044            column_ids,
2045            Arc::new(AtomicU64::new(0)),
2046            ConflictBehavior::Overwrite,
2047        )
2048        .await
2049        .boxed()
2050        .execute();
2051        materialize_executor.next().await.transpose().unwrap();
2052
2053        materialize_executor.next().await.transpose().unwrap();
2054
2055        // First stream chunk. We check the existence of (3) -> (3,6)
2056        match materialize_executor.next().await.transpose().unwrap() {
2057            Some(Message::Barrier(_)) => {
2058                // can read (8, 3), check insert after update
2059                let row = table
2060                    .get_row(
2061                        &OwnedRow::new(vec![Some(8_i32.into())]),
2062                        HummockReadEpoch::NoWait(u64::MAX),
2063                    )
2064                    .await
2065                    .unwrap();
2066                assert_eq!(
2067                    row,
2068                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
2069                );
2070            }
2071            _ => unreachable!(),
2072        }
2073        materialize_executor.next().await.transpose().unwrap();
2074
2075        match materialize_executor.next().await.transpose().unwrap() {
2076            Some(Message::Barrier(_)) => {
2077                let row = table
2078                    .get_row(
2079                        &OwnedRow::new(vec![Some(7_i32.into())]),
2080                        HummockReadEpoch::NoWait(u64::MAX),
2081                    )
2082                    .await
2083                    .unwrap();
2084                assert_eq!(
2085                    row,
2086                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2087                );
2088
2089                // check delete wrong value
2090                let row = table
2091                    .get_row(
2092                        &OwnedRow::new(vec![Some(3_i32.into())]),
2093                        HummockReadEpoch::NoWait(u64::MAX),
2094                    )
2095                    .await
2096                    .unwrap();
2097                assert_eq!(row, None);
2098
2099                // check delete wrong pk
2100                let row = table
2101                    .get_row(
2102                        &OwnedRow::new(vec![Some(5_i32.into())]),
2103                        HummockReadEpoch::NoWait(u64::MAX),
2104                    )
2105                    .await
2106                    .unwrap();
2107                assert_eq!(row, None);
2108            }
2109            _ => unreachable!(),
2110        }
2111
2112        materialize_executor.next().await.transpose().unwrap();
2113        // Second stream chunk. We check the existence of (7) -> (7,8)
2114        match materialize_executor.next().await.transpose().unwrap() {
2115            Some(Message::Barrier(_)) => {
2116                let row = table
2117                    .get_row(
2118                        &OwnedRow::new(vec![Some(1_i32.into())]),
2119                        HummockReadEpoch::NoWait(u64::MAX),
2120                    )
2121                    .await
2122                    .unwrap();
2123                assert_eq!(
2124                    row,
2125                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
2126                );
2127
2128                // check update wrong value
2129                let row = table
2130                    .get_row(
2131                        &OwnedRow::new(vec![Some(2_i32.into())]),
2132                        HummockReadEpoch::NoWait(u64::MAX),
2133                    )
2134                    .await
2135                    .unwrap();
2136                assert_eq!(
2137                    row,
2138                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2139                );
2140
2141                // check update wrong pk, should become insert
2142                let row = table
2143                    .get_row(
2144                        &OwnedRow::new(vec![Some(9_i32.into())]),
2145                        HummockReadEpoch::NoWait(u64::MAX),
2146                    )
2147                    .await
2148                    .unwrap();
2149                assert_eq!(
2150                    row,
2151                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2152                );
2153            }
2154            _ => unreachable!(),
2155        }
2156    }
2157
2158    #[tokio::test]
2159    async fn test_ignore_insert_conflict() {
2160        // Prepare storage and memtable.
2161        let memory_state_store = MemoryStateStore::new();
2162        let table_id = TableId::new(1);
2163        // Two columns of int32 type, the first column is PK.
2164        let schema = Schema::new(vec![
2165            Field::unnamed(DataType::Int32),
2166            Field::unnamed(DataType::Int32),
2167        ]);
2168        let column_ids = vec![0.into(), 1.into()];
2169
2170        // test double insert one pk, the latter needs to be ignored.
2171        let chunk1 = StreamChunk::from_pretty(
2172            " i i
2173            + 1 3
2174            + 1 4
2175            + 2 5
2176            + 3 6",
2177        );
2178
2179        let chunk2 = StreamChunk::from_pretty(
2180            " i i
2181            + 1 5
2182            + 2 6",
2183        );
2184
2185        // test delete wrong value, delete inexistent pk
2186        let chunk3 = StreamChunk::from_pretty(
2187            " i i
2188            + 1 6",
2189        );
2190
2191        // Prepare stream executors.
2192        let source = MockSource::with_messages(vec![
2193            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2194            Message::Chunk(chunk1),
2195            Message::Chunk(chunk2),
2196            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2197            Message::Chunk(chunk3),
2198            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2199        ])
2200        .into_executor(schema.clone(), PkIndices::new());
2201
2202        let order_types = vec![OrderType::ascending()];
2203        let column_descs = vec![
2204            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2205            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2206        ];
2207
2208        let table = BatchTable::for_test(
2209            memory_state_store.clone(),
2210            table_id,
2211            column_descs,
2212            order_types,
2213            vec![0],
2214            vec![0, 1],
2215        );
2216
2217        let mut materialize_executor = MaterializeExecutor::for_test(
2218            source,
2219            memory_state_store,
2220            table_id,
2221            vec![ColumnOrder::new(0, OrderType::ascending())],
2222            column_ids,
2223            Arc::new(AtomicU64::new(0)),
2224            ConflictBehavior::IgnoreConflict,
2225        )
2226        .await
2227        .boxed()
2228        .execute();
2229        materialize_executor.next().await.transpose().unwrap();
2230
2231        materialize_executor.next().await.transpose().unwrap();
2232        materialize_executor.next().await.transpose().unwrap();
2233
2234        // First stream chunk. We check the existence of (3) -> (3,6)
2235        match materialize_executor.next().await.transpose().unwrap() {
2236            Some(Message::Barrier(_)) => {
2237                let row = table
2238                    .get_row(
2239                        &OwnedRow::new(vec![Some(3_i32.into())]),
2240                        HummockReadEpoch::NoWait(u64::MAX),
2241                    )
2242                    .await
2243                    .unwrap();
2244                assert_eq!(
2245                    row,
2246                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2247                );
2248
2249                let row = table
2250                    .get_row(
2251                        &OwnedRow::new(vec![Some(1_i32.into())]),
2252                        HummockReadEpoch::NoWait(u64::MAX),
2253                    )
2254                    .await
2255                    .unwrap();
2256                assert_eq!(
2257                    row,
2258                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2259                );
2260
2261                let row = table
2262                    .get_row(
2263                        &OwnedRow::new(vec![Some(2_i32.into())]),
2264                        HummockReadEpoch::NoWait(u64::MAX),
2265                    )
2266                    .await
2267                    .unwrap();
2268                assert_eq!(
2269                    row,
2270                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2271                );
2272            }
2273            _ => unreachable!(),
2274        }
2275    }
2276
2277    #[tokio::test]
2278    async fn test_ignore_delete_then_insert() {
2279        // Prepare storage and memtable.
2280        let memory_state_store = MemoryStateStore::new();
2281        let table_id = TableId::new(1);
2282        // Two columns of int32 type, the first column is PK.
2283        let schema = Schema::new(vec![
2284            Field::unnamed(DataType::Int32),
2285            Field::unnamed(DataType::Int32),
2286        ]);
2287        let column_ids = vec![0.into(), 1.into()];
2288
2289        // test insert after delete one pk, the latter insert should succeed.
2290        let chunk1 = StreamChunk::from_pretty(
2291            " i i
2292            + 1 3
2293            - 1 3
2294            + 1 6",
2295        );
2296
2297        // Prepare stream executors.
2298        let source = MockSource::with_messages(vec![
2299            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2300            Message::Chunk(chunk1),
2301            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2302        ])
2303        .into_executor(schema.clone(), PkIndices::new());
2304
2305        let order_types = vec![OrderType::ascending()];
2306        let column_descs = vec![
2307            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2308            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2309        ];
2310
2311        let table = BatchTable::for_test(
2312            memory_state_store.clone(),
2313            table_id,
2314            column_descs,
2315            order_types,
2316            vec![0],
2317            vec![0, 1],
2318        );
2319
2320        let mut materialize_executor = MaterializeExecutor::for_test(
2321            source,
2322            memory_state_store,
2323            table_id,
2324            vec![ColumnOrder::new(0, OrderType::ascending())],
2325            column_ids,
2326            Arc::new(AtomicU64::new(0)),
2327            ConflictBehavior::IgnoreConflict,
2328        )
2329        .await
2330        .boxed()
2331        .execute();
2332        let _msg1 = materialize_executor
2333            .next()
2334            .await
2335            .transpose()
2336            .unwrap()
2337            .unwrap()
2338            .as_barrier()
2339            .unwrap();
2340        let _msg2 = materialize_executor
2341            .next()
2342            .await
2343            .transpose()
2344            .unwrap()
2345            .unwrap()
2346            .as_chunk()
2347            .unwrap();
2348        let _msg3 = materialize_executor
2349            .next()
2350            .await
2351            .transpose()
2352            .unwrap()
2353            .unwrap()
2354            .as_barrier()
2355            .unwrap();
2356
2357        let row = table
2358            .get_row(
2359                &OwnedRow::new(vec![Some(1_i32.into())]),
2360                HummockReadEpoch::NoWait(u64::MAX),
2361            )
2362            .await
2363            .unwrap();
2364        assert_eq!(
2365            row,
2366            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2367        );
2368    }
2369
2370    #[tokio::test]
2371    async fn test_ignore_delete_and_update_conflict() {
2372        // Prepare storage and memtable.
2373        let memory_state_store = MemoryStateStore::new();
2374        let table_id = TableId::new(1);
2375        // Two columns of int32 type, the first column is PK.
2376        let schema = Schema::new(vec![
2377            Field::unnamed(DataType::Int32),
2378            Field::unnamed(DataType::Int32),
2379        ]);
2380        let column_ids = vec![0.into(), 1.into()];
2381
2382        // test double insert one pk, the latter should be ignored.
2383        let chunk1 = StreamChunk::from_pretty(
2384            " i i
2385            + 1 4
2386            + 2 5
2387            + 3 6
2388            U- 8 1
2389            U+ 8 2
2390            + 8 3",
2391        );
2392
2393        // test delete wrong value, delete inexistent pk
2394        let chunk2 = StreamChunk::from_pretty(
2395            " i i
2396            + 7 8
2397            - 3 4
2398            - 5 0",
2399        );
2400
2401        // test delete wrong value, delete inexistent pk
2402        let chunk3 = StreamChunk::from_pretty(
2403            " i i
2404            + 1 5
2405            U- 2 4
2406            U+ 2 8
2407            U- 9 0
2408            U+ 9 1",
2409        );
2410
2411        // Prepare stream executors.
2412        let source = MockSource::with_messages(vec![
2413            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2414            Message::Chunk(chunk1),
2415            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2416            Message::Chunk(chunk2),
2417            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2418            Message::Chunk(chunk3),
2419            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2420        ])
2421        .into_executor(schema.clone(), PkIndices::new());
2422
2423        let order_types = vec![OrderType::ascending()];
2424        let column_descs = vec![
2425            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2426            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2427        ];
2428
2429        let table = BatchTable::for_test(
2430            memory_state_store.clone(),
2431            table_id,
2432            column_descs,
2433            order_types,
2434            vec![0],
2435            vec![0, 1],
2436        );
2437
2438        let mut materialize_executor = MaterializeExecutor::for_test(
2439            source,
2440            memory_state_store,
2441            table_id,
2442            vec![ColumnOrder::new(0, OrderType::ascending())],
2443            column_ids,
2444            Arc::new(AtomicU64::new(0)),
2445            ConflictBehavior::IgnoreConflict,
2446        )
2447        .await
2448        .boxed()
2449        .execute();
2450        materialize_executor.next().await.transpose().unwrap();
2451
2452        materialize_executor.next().await.transpose().unwrap();
2453
2454        // First stream chunk. We check the existence of (3) -> (3,6)
2455        match materialize_executor.next().await.transpose().unwrap() {
2456            Some(Message::Barrier(_)) => {
2457                // can read (8, 2), check insert after update
2458                let row = table
2459                    .get_row(
2460                        &OwnedRow::new(vec![Some(8_i32.into())]),
2461                        HummockReadEpoch::NoWait(u64::MAX),
2462                    )
2463                    .await
2464                    .unwrap();
2465                assert_eq!(
2466                    row,
2467                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2468                );
2469            }
2470            _ => unreachable!(),
2471        }
2472        materialize_executor.next().await.transpose().unwrap();
2473
2474        match materialize_executor.next().await.transpose().unwrap() {
2475            Some(Message::Barrier(_)) => {
2476                let row = table
2477                    .get_row(
2478                        &OwnedRow::new(vec![Some(7_i32.into())]),
2479                        HummockReadEpoch::NoWait(u64::MAX),
2480                    )
2481                    .await
2482                    .unwrap();
2483                assert_eq!(
2484                    row,
2485                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2486                );
2487
2488                // check delete wrong value
2489                let row = table
2490                    .get_row(
2491                        &OwnedRow::new(vec![Some(3_i32.into())]),
2492                        HummockReadEpoch::NoWait(u64::MAX),
2493                    )
2494                    .await
2495                    .unwrap();
2496                assert_eq!(row, None);
2497
2498                // check delete wrong pk
2499                let row = table
2500                    .get_row(
2501                        &OwnedRow::new(vec![Some(5_i32.into())]),
2502                        HummockReadEpoch::NoWait(u64::MAX),
2503                    )
2504                    .await
2505                    .unwrap();
2506                assert_eq!(row, None);
2507            }
2508            _ => unreachable!(),
2509        }
2510
2511        materialize_executor.next().await.transpose().unwrap();
2512        // materialize_executor.next().await.transpose().unwrap();
2513        // Second stream chunk. We check the existence of (7) -> (7,8)
2514        match materialize_executor.next().await.transpose().unwrap() {
2515            Some(Message::Barrier(_)) => {
2516                let row = table
2517                    .get_row(
2518                        &OwnedRow::new(vec![Some(1_i32.into())]),
2519                        HummockReadEpoch::NoWait(u64::MAX),
2520                    )
2521                    .await
2522                    .unwrap();
2523                assert_eq!(
2524                    row,
2525                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2526                );
2527
2528                // check update wrong value
2529                let row = table
2530                    .get_row(
2531                        &OwnedRow::new(vec![Some(2_i32.into())]),
2532                        HummockReadEpoch::NoWait(u64::MAX),
2533                    )
2534                    .await
2535                    .unwrap();
2536                assert_eq!(
2537                    row,
2538                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2539                );
2540
2541                // check update wrong pk, should become insert
2542                let row = table
2543                    .get_row(
2544                        &OwnedRow::new(vec![Some(9_i32.into())]),
2545                        HummockReadEpoch::NoWait(u64::MAX),
2546                    )
2547                    .await
2548                    .unwrap();
2549                assert_eq!(
2550                    row,
2551                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2552                );
2553            }
2554            _ => unreachable!(),
2555        }
2556    }
2557
2558    #[tokio::test]
2559    async fn test_do_update_if_not_null_conflict() {
2560        // Prepare storage and memtable.
2561        let memory_state_store = MemoryStateStore::new();
2562        let table_id = TableId::new(1);
2563        // Two columns of int32 type, the first column is PK.
2564        let schema = Schema::new(vec![
2565            Field::unnamed(DataType::Int32),
2566            Field::unnamed(DataType::Int32),
2567        ]);
2568        let column_ids = vec![0.into(), 1.into()];
2569
2570        // should get (8, 2)
2571        let chunk1 = StreamChunk::from_pretty(
2572            " i i
2573            + 1 4
2574            + 2 .
2575            + 3 6
2576            U- 8 .
2577            U+ 8 2
2578            + 8 .",
2579        );
2580
2581        // should not get (3, x), should not get (5, 0)
2582        let chunk2 = StreamChunk::from_pretty(
2583            " i i
2584            + 7 8
2585            - 3 4
2586            - 5 0",
2587        );
2588
2589        // should get (2, None), (7, 8)
2590        let chunk3 = StreamChunk::from_pretty(
2591            " i i
2592            + 1 5
2593            + 7 .
2594            U- 2 4
2595            U+ 2 .
2596            U- 9 0
2597            U+ 9 1",
2598        );
2599
2600        // Prepare stream executors.
2601        let source = MockSource::with_messages(vec![
2602            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2603            Message::Chunk(chunk1),
2604            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2605            Message::Chunk(chunk2),
2606            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2607            Message::Chunk(chunk3),
2608            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2609        ])
2610        .into_executor(schema.clone(), PkIndices::new());
2611
2612        let order_types = vec![OrderType::ascending()];
2613        let column_descs = vec![
2614            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2615            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2616        ];
2617
2618        let table = BatchTable::for_test(
2619            memory_state_store.clone(),
2620            table_id,
2621            column_descs,
2622            order_types,
2623            vec![0],
2624            vec![0, 1],
2625        );
2626
2627        let mut materialize_executor = MaterializeExecutor::for_test(
2628            source,
2629            memory_state_store,
2630            table_id,
2631            vec![ColumnOrder::new(0, OrderType::ascending())],
2632            column_ids,
2633            Arc::new(AtomicU64::new(0)),
2634            ConflictBehavior::DoUpdateIfNotNull,
2635        )
2636        .await
2637        .boxed()
2638        .execute();
2639        materialize_executor.next().await.transpose().unwrap();
2640
2641        materialize_executor.next().await.transpose().unwrap();
2642
2643        // First stream chunk. We check the existence of (3) -> (3,6)
2644        match materialize_executor.next().await.transpose().unwrap() {
2645            Some(Message::Barrier(_)) => {
2646                let row = table
2647                    .get_row(
2648                        &OwnedRow::new(vec![Some(8_i32.into())]),
2649                        HummockReadEpoch::NoWait(u64::MAX),
2650                    )
2651                    .await
2652                    .unwrap();
2653                assert_eq!(
2654                    row,
2655                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2656                );
2657
2658                let row = table
2659                    .get_row(
2660                        &OwnedRow::new(vec![Some(2_i32.into())]),
2661                        HummockReadEpoch::NoWait(u64::MAX),
2662                    )
2663                    .await
2664                    .unwrap();
2665                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2666            }
2667            _ => unreachable!(),
2668        }
2669        materialize_executor.next().await.transpose().unwrap();
2670
2671        match materialize_executor.next().await.transpose().unwrap() {
2672            Some(Message::Barrier(_)) => {
2673                let row = table
2674                    .get_row(
2675                        &OwnedRow::new(vec![Some(7_i32.into())]),
2676                        HummockReadEpoch::NoWait(u64::MAX),
2677                    )
2678                    .await
2679                    .unwrap();
2680                assert_eq!(
2681                    row,
2682                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2683                );
2684
2685                // check delete wrong value
2686                let row = table
2687                    .get_row(
2688                        &OwnedRow::new(vec![Some(3_i32.into())]),
2689                        HummockReadEpoch::NoWait(u64::MAX),
2690                    )
2691                    .await
2692                    .unwrap();
2693                assert_eq!(row, None);
2694
2695                // check delete wrong pk
2696                let row = table
2697                    .get_row(
2698                        &OwnedRow::new(vec![Some(5_i32.into())]),
2699                        HummockReadEpoch::NoWait(u64::MAX),
2700                    )
2701                    .await
2702                    .unwrap();
2703                assert_eq!(row, None);
2704            }
2705            _ => unreachable!(),
2706        }
2707
2708        materialize_executor.next().await.transpose().unwrap();
2709        // materialize_executor.next().await.transpose().unwrap();
2710        // Second stream chunk. We check the existence of (7) -> (7,8)
2711        match materialize_executor.next().await.transpose().unwrap() {
2712            Some(Message::Barrier(_)) => {
2713                let row = table
2714                    .get_row(
2715                        &OwnedRow::new(vec![Some(7_i32.into())]),
2716                        HummockReadEpoch::NoWait(u64::MAX),
2717                    )
2718                    .await
2719                    .unwrap();
2720                assert_eq!(
2721                    row,
2722                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2723                );
2724
2725                // check update wrong value
2726                let row = table
2727                    .get_row(
2728                        &OwnedRow::new(vec![Some(2_i32.into())]),
2729                        HummockReadEpoch::NoWait(u64::MAX),
2730                    )
2731                    .await
2732                    .unwrap();
2733                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2734
2735                // check update wrong pk, should become insert
2736                let row = table
2737                    .get_row(
2738                        &OwnedRow::new(vec![Some(9_i32.into())]),
2739                        HummockReadEpoch::NoWait(u64::MAX),
2740                    )
2741                    .await
2742                    .unwrap();
2743                assert_eq!(
2744                    row,
2745                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2746                );
2747            }
2748            _ => unreachable!(),
2749        }
2750    }
2751
2752    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2753        const KN: u32 = 4;
2754        const SEED: u64 = 998244353;
2755        let mut ret = vec![];
2756        let mut builder =
2757            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2758        let mut rng = SmallRng::seed_from_u64(SEED);
2759
2760        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2761            let len = c.data_chunk().capacity();
2762            let mut c = StreamChunkMut::from(c);
2763            for i in 0..len {
2764                c.set_vis(i, rng.random_bool(0.5));
2765            }
2766            c.into()
2767        };
2768        for _ in 0..row_number {
2769            let k = (rng.next_u32() % KN) as i32;
2770            let v = rng.next_u32() as i32;
2771            let op = if rng.random_bool(0.5) {
2772                Op::Insert
2773            } else {
2774                Op::Delete
2775            };
2776            if let Some(c) =
2777                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2778            {
2779                ret.push(random_vis(c, &mut rng));
2780            }
2781        }
2782        if let Some(c) = builder.take() {
2783            ret.push(random_vis(c, &mut rng));
2784        }
2785        ret
2786    }
2787
2788    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2789        const N: usize = 100000;
2790
2791        // Prepare storage and memtable.
2792        let memory_state_store = MemoryStateStore::new();
2793        let table_id = TableId::new(1);
2794        // Two columns of int32 type, the first column is PK.
2795        let schema = Schema::new(vec![
2796            Field::unnamed(DataType::Int32),
2797            Field::unnamed(DataType::Int32),
2798        ]);
2799        let column_ids = vec![0.into(), 1.into()];
2800
2801        let chunks = gen_fuzz_data(N, 128);
2802        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2803            .chain(chunks.into_iter().map(Message::Chunk))
2804            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2805                test_epoch(2),
2806            ))))
2807            .collect();
2808        // Prepare stream executors.
2809        let source =
2810            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2811
2812        let mut materialize_executor = MaterializeExecutor::for_test(
2813            source,
2814            memory_state_store.clone(),
2815            table_id,
2816            vec![ColumnOrder::new(0, OrderType::ascending())],
2817            column_ids,
2818            Arc::new(AtomicU64::new(0)),
2819            conflict_behavior,
2820        )
2821        .await
2822        .boxed()
2823        .execute();
2824        materialize_executor.expect_barrier().await;
2825
2826        let order_types = vec![OrderType::ascending()];
2827        let column_descs = vec![
2828            ColumnDesc::unnamed(0.into(), DataType::Int32),
2829            ColumnDesc::unnamed(1.into(), DataType::Int32),
2830        ];
2831        let pk_indices = vec![0];
2832
2833        let mut table = StateTable::from_table_catalog(
2834            &crate::common::table::test_utils::gen_pbtable(
2835                TableId::from(1002),
2836                column_descs.clone(),
2837                order_types,
2838                pk_indices,
2839                0,
2840            ),
2841            memory_state_store.clone(),
2842            None,
2843        )
2844        .await;
2845
2846        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2847            // check with state table's memtable
2848            table.write_chunk(c);
2849        }
2850    }
2851
2852    #[tokio::test]
2853    async fn fuzz_test_stream_consistent_upsert() {
2854        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2855    }
2856
2857    #[tokio::test]
2858    async fn fuzz_test_stream_consistent_ignore() {
2859        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2860    }
2861}