risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::HashSet;
17use std::marker::PhantomData;
18use std::ops::{Bound, Deref, Index};
19
20use bytes::Bytes;
21use futures::future::Either;
22use futures::stream::{self, select_with_strategy};
23use futures_async_stream::try_stream;
24use itertools::Itertools;
25use risingwave_common::array::Op;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{
28    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
29};
30use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
31use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
32use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
33use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
34use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
35use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
36use risingwave_hummock_sdk::HummockReadEpoch;
37use risingwave_pb::catalog::Table;
38use risingwave_pb::catalog::table::{Engine, OptionalAssociatedSourceId};
39use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
40use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
41use risingwave_storage::table::KeyedRow;
42
43use crate::cache::ManagedLruCache;
44use crate::common::metrics::MetricsInfo;
45use crate::common::table::state_table::{
46    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
47};
48use crate::executor::error::ErrorKind;
49use crate::executor::monitor::MaterializeMetrics;
50use crate::executor::mview::RefreshProgressTable;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57    NormalIngestion,
58    MergingData,
59    CleanUp,
60    CommitAndYieldBarrier {
61        barrier: BarrierInner<M>,
62        expect_next_state: Box<MaterializeStreamState<M>>,
63    },
64    RefreshEnd {
65        on_complete_epoch: EpochPair,
66    },
67}
68
69/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
70pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71    input: Executor,
72
73    schema: Schema,
74
75    state_table: StateTableInner<S, SD>,
76
77    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
78    arrange_key_indices: Vec<usize>,
79
80    actor_context: ActorContextRef,
81
82    materialize_cache: MaterializeCache<SD>,
83
84    conflict_behavior: ConflictBehavior,
85
86    version_column_indices: Vec<u32>,
87
88    may_have_downstream: bool,
89
90    subscriber_ids: HashSet<u32>,
91
92    metrics: MaterializeMetrics,
93
94    /// No data will be written to hummock table. This Materialize is just a dummy node.
95    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
96    is_dummy_table: bool,
97
98    /// Indices of TOAST-able columns for PostgreSQL CDC tables. None means either non-CDC table or CDC table without TOAST-able columns.
99    toastable_column_indices: Option<Vec<usize>>,
100
101    /// Optional refresh arguments and state for refreshable materialized views
102    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
103
104    /// Local barrier manager for reporting barrier events
105    local_barrier_manager: LocalBarrierManager,
106}
107
108/// Arguments and state for refreshable materialized views
109pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
110    /// Table catalog for main table
111    pub table_catalog: Table,
112
113    /// Table catalog for staging table
114    pub staging_table_catalog: Table,
115
116    /// Flag indicating if this table is currently being refreshed
117    pub is_refreshing: bool,
118
119    /// During data refresh (between `RefreshStart` and `LoadFinish`),
120    /// data will be written to both the main table and the staging table.
121    ///
122    /// The staging table is PK-only.
123    ///
124    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
125    pub staging_table: StateTableInner<S, SD>,
126
127    /// Progress table for tracking refresh state per `VNode` for fault tolerance
128    pub progress_table: RefreshProgressTable<S>,
129
130    /// Table ID for this refreshable materialized view
131    pub table_id: TableId,
132}
133
134impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
135    /// Create new `RefreshableMaterializeArgs`
136    pub async fn new(
137        store: S,
138        table_catalog: &Table,
139        staging_table_catalog: &Table,
140        progress_state_table: &Table,
141        vnodes: Option<Arc<Bitmap>>,
142    ) -> Self {
143        let table_id = TableId::new(table_catalog.id);
144
145        // staging table is pk-only, and we don't need to check value consistency
146        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
147            staging_table_catalog,
148            store.clone(),
149            vnodes.clone(),
150        )
151        .await;
152
153        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
154            progress_state_table,
155            store,
156            vnodes,
157        )
158        .await;
159
160        // Get primary key length from main table catalog
161        let pk_len = table_catalog.pk.len();
162        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
163
164        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
165
166        Self {
167            table_catalog: table_catalog.clone(),
168            staging_table_catalog: staging_table_catalog.clone(),
169            is_refreshing: false,
170            staging_table,
171            progress_table,
172            table_id,
173        }
174    }
175}
176
177fn get_op_consistency_level(
178    conflict_behavior: ConflictBehavior,
179    may_have_downstream: bool,
180    subscriber_ids: &HashSet<u32>,
181) -> StateTableOpConsistencyLevel {
182    if !subscriber_ids.is_empty() {
183        StateTableOpConsistencyLevel::LogStoreEnabled
184    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
185        // Table with overwrite conflict behavior could disable conflict check
186        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
187        StateTableOpConsistencyLevel::Inconsistent
188    } else {
189        StateTableOpConsistencyLevel::ConsistentOldValue
190    }
191}
192
193impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
194    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
195    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
196    /// should be `None`.
197    #[allow(clippy::too_many_arguments)]
198    pub async fn new(
199        input: Executor,
200        schema: Schema,
201        store: S,
202        arrange_key: Vec<ColumnOrder>,
203        actor_context: ActorContextRef,
204        vnodes: Option<Arc<Bitmap>>,
205        table_catalog: &Table,
206        watermark_epoch: AtomicU64Ref,
207        conflict_behavior: ConflictBehavior,
208        version_column_indices: Vec<u32>,
209        metrics: Arc<StreamingMetrics>,
210        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
211        local_barrier_manager: LocalBarrierManager,
212    ) -> Self {
213        let table_columns: Vec<ColumnDesc> = table_catalog
214            .columns
215            .iter()
216            .map(|col| col.column_desc.as_ref().unwrap().into())
217            .collect();
218
219        // Extract TOAST-able column indices from table columns.
220        // Only for PostgreSQL CDC tables.
221        let toastable_column_indices = if table_catalog.cdc_table_type()
222            == risingwave_pb::catalog::table::CdcTableType::Postgres
223        {
224            let toastable_indices: Vec<usize> = table_columns
225                .iter()
226                .enumerate()
227                .filter_map(|(index, column)| match &column.data_type {
228                    // Currently supports TOAST updates for:
229                    // - jsonb (DataType::Jsonb)
230                    // - varchar (DataType::Varchar)
231                    // - bytea (DataType::Bytea)
232                    // - One-dimensional arrays of the above types (DataType::List)
233                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
234
235                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
236                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
237                        Some(index)
238                    }
239                    _ => None,
240                })
241                .collect();
242
243            if toastable_indices.is_empty() {
244                None
245            } else {
246                Some(toastable_indices)
247            }
248        } else {
249            None
250        };
251
252        let row_serde: BasicSerde = BasicSerde::new(
253            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
254            Arc::from(table_columns.into_boxed_slice()),
255        );
256
257        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
258        let may_have_downstream = actor_context.initial_dispatch_num != 0;
259        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
260        let op_consistency_level =
261            get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
262        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
263        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
264            .with_op_consistency_level(op_consistency_level)
265            .enable_preload_all_rows_by_config(&actor_context.streaming_config)
266            .build()
267            .await;
268
269        let mv_metrics = metrics.new_materialize_metrics(
270            TableId::new(table_catalog.id),
271            actor_context.id,
272            actor_context.fragment_id,
273        );
274
275        let metrics_info =
276            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
277
278        let is_dummy_table =
279            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
280
281        Self {
282            input,
283            schema,
284            state_table,
285            arrange_key_indices,
286            actor_context,
287            materialize_cache: MaterializeCache::new(
288                watermark_epoch,
289                metrics_info,
290                row_serde,
291                version_column_indices.clone(),
292            ),
293            conflict_behavior,
294            version_column_indices,
295            is_dummy_table,
296            may_have_downstream,
297            subscriber_ids,
298            metrics: mv_metrics,
299            toastable_column_indices,
300            refresh_args,
301            local_barrier_manager,
302        }
303    }
304
305    #[try_stream(ok = Message, error = StreamExecutorError)]
306    async fn execute_inner(mut self) {
307        let mv_table_id = TableId::new(self.state_table.table_id());
308        let _staging_table_id = TableId::new(self.state_table.table_id());
309        let data_types = self.schema.data_types();
310        let mut input = self.input.execute();
311
312        let barrier = expect_first_barrier(&mut input).await?;
313        let first_epoch = barrier.epoch;
314        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
315        // The first barrier message should be propagated.
316        yield Message::Barrier(barrier);
317        self.state_table.init_epoch(first_epoch).await?;
318
319        // default to normal ingestion
320        let mut inner_state =
321            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
322        // Initialize staging table for refreshable materialized views
323        if let Some(ref mut refresh_args) = self.refresh_args {
324            refresh_args.staging_table.init_epoch(first_epoch).await?;
325
326            // Initialize progress table and load existing progress for recovery
327            refresh_args.progress_table.recover(first_epoch).await?;
328
329            // Check if refresh is already in progress (recovery scenario)
330            let progress_stats = refresh_args.progress_table.get_progress_stats();
331            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
332                refresh_args.is_refreshing = true;
333                tracing::info!(
334                    total_vnodes = progress_stats.total_vnodes,
335                    completed_vnodes = progress_stats.completed_vnodes,
336                    "Recovered refresh in progress, resuming refresh operation"
337                );
338
339                // Since stage info is no longer stored in progress table,
340                // we need to determine recovery state differently.
341                // For now, assume all incomplete VNodes need to continue merging
342                let incomplete_vnodes: Vec<_> = refresh_args
343                    .progress_table
344                    .get_all_progress()
345                    .iter()
346                    .filter(|(_, entry)| !entry.is_completed)
347                    .map(|(&vnode, _)| vnode)
348                    .collect();
349
350                if !incomplete_vnodes.is_empty() {
351                    // Some VNodes are incomplete, need to resume refresh operation
352                    tracing::info!(
353                        incomplete_vnodes = incomplete_vnodes.len(),
354                        "Recovery detected incomplete VNodes, resuming refresh operation"
355                    );
356                    // Since stage tracking is now in memory, we'll determine the appropriate
357                    // stage based on the executor's internal state machine
358                } else {
359                    // This should not happen if is_complete() returned false, but handle it gracefully
360                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
361                }
362            }
363        }
364
365        // Determine initial execution stage (for recovery scenarios)
366        if let Some(ref refresh_args) = self.refresh_args
367            && refresh_args.is_refreshing
368        {
369            // Recovery logic: Check if there are incomplete vnodes from previous run
370            let incomplete_vnodes: Vec<_> = refresh_args
371                .progress_table
372                .get_all_progress()
373                .iter()
374                .filter(|(_, entry)| !entry.is_completed)
375                .map(|(&vnode, _)| vnode)
376                .collect();
377            if !incomplete_vnodes.is_empty() {
378                // Resume from merge stage since some VNodes were left incomplete
379                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
380                tracing::info!(
381                    incomplete_vnodes = incomplete_vnodes.len(),
382                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
383                );
384            }
385        }
386
387        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
388        'main_loop: loop {
389            match *inner_state {
390                MaterializeStreamState::NormalIngestion => {
391                    #[for_await]
392                    '_normal_ingest: for msg in input.by_ref() {
393                        let msg = msg?;
394                        self.materialize_cache.evict();
395
396                        match msg {
397                            Message::Watermark(w) => {
398                                yield Message::Watermark(w);
399                            }
400                            Message::Chunk(chunk) if self.is_dummy_table => {
401                                self.metrics
402                                    .materialize_input_row_count
403                                    .inc_by(chunk.cardinality() as u64);
404                                yield Message::Chunk(chunk);
405                            }
406                            Message::Chunk(chunk) => {
407                                self.metrics
408                                    .materialize_input_row_count
409                                    .inc_by(chunk.cardinality() as u64);
410                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
411                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
412                                // and the conflict behavior is overwrite.
413                                let do_not_handle_conflict = !self.state_table.is_consistent_op()
414                                    && self.version_column_indices.is_empty()
415                                    && self.conflict_behavior == ConflictBehavior::Overwrite;
416
417                                match self.conflict_behavior {
418                                    checked_conflict_behaviors!() if !do_not_handle_conflict => {
419                                        if chunk.cardinality() == 0 {
420                                            // empty chunk
421                                            continue;
422                                        }
423                                        let (data_chunk, ops) = chunk.clone().into_parts();
424
425                                        if self.state_table.value_indices().is_some() {
426                                            // TODO(st1page): when materialize partial columns(), we should
427                                            // construct some columns in the pk
428                                            panic!(
429                                                "materialize executor with data check can not handle only materialize partial columns"
430                                            )
431                                        };
432                                        let values = data_chunk.serialize();
433
434                                        let key_chunk =
435                                            data_chunk.project(self.state_table.pk_indices());
436
437                                        // For refreshable materialized views, write to staging table during refresh
438                                        // Do not use generate_output here.
439                                        if let Some(ref mut refresh_args) = self.refresh_args
440                                            && refresh_args.is_refreshing
441                                        {
442                                            let key_chunk = chunk
443                                                .clone()
444                                                .project(self.state_table.pk_indices());
445                                            tracing::trace!(
446                                                staging_chunk = %key_chunk.to_pretty(),
447                                                input_chunk = %chunk.to_pretty(),
448                                                "writing to staging table"
449                                            );
450                                            if cfg!(debug_assertions) {
451                                                // refreshable source should be append-only
452                                                assert!(
453                                                    key_chunk
454                                                        .ops()
455                                                        .iter()
456                                                        .all(|op| op == &Op::Insert)
457                                                );
458                                            }
459                                            refresh_args
460                                                .staging_table
461                                                .write_chunk(key_chunk.clone());
462                                            refresh_args.staging_table.try_flush().await?;
463                                        }
464
465                                        let pks = {
466                                            let mut pks = vec![vec![]; data_chunk.capacity()];
467                                            key_chunk
468                                                .rows_with_holes()
469                                                .zip_eq_fast(pks.iter_mut())
470                                                .for_each(|(r, vnode_and_pk)| {
471                                                    if let Some(r) = r {
472                                                        self.state_table
473                                                            .pk_serde()
474                                                            .serialize(r, vnode_and_pk);
475                                                    }
476                                                });
477                                            pks
478                                        };
479                                        let (_, vis) = key_chunk.into_parts();
480                                        let row_ops = ops
481                                            .iter()
482                                            .zip_eq_debug(pks.into_iter())
483                                            .zip_eq_debug(values.into_iter())
484                                            .zip_eq_debug(vis.iter())
485                                            .filter_map(|(((op, k), v), vis)| {
486                                                vis.then_some((*op, k, v))
487                                            })
488                                            .collect_vec();
489
490                                        let change_buffer = self
491                                            .materialize_cache
492                                            .handle(
493                                                row_ops,
494                                                &self.state_table,
495                                                self.conflict_behavior,
496                                                &self.metrics,
497                                                self.toastable_column_indices.as_deref(),
498                                            )
499                                            .await?;
500
501                                        match change_buffer.into_chunk(data_types.clone()) {
502                                            Some(output_chunk) => {
503                                                self.state_table.write_chunk(output_chunk.clone());
504                                                self.state_table.try_flush().await?;
505                                                yield Message::Chunk(output_chunk);
506                                            }
507                                            None => continue,
508                                        }
509                                    }
510                                    ConflictBehavior::IgnoreConflict => unreachable!(),
511                                    ConflictBehavior::NoCheck
512                                    | ConflictBehavior::Overwrite
513                                    | ConflictBehavior::DoUpdateIfNotNull => {
514                                        self.state_table.write_chunk(chunk.clone());
515                                        self.state_table.try_flush().await?;
516
517                                        // For refreshable materialized views, also write to staging table during refresh
518                                        if let Some(ref mut refresh_args) = self.refresh_args
519                                            && refresh_args.is_refreshing
520                                        {
521                                            let key_chunk = chunk
522                                                .clone()
523                                                .project(self.state_table.pk_indices());
524                                            tracing::trace!(
525                                                staging_chunk = %key_chunk.to_pretty(),
526                                                input_chunk = %chunk.to_pretty(),
527                                                "writing to staging table"
528                                            );
529                                            if cfg!(debug_assertions) {
530                                                // refreshable source should be append-only
531                                                assert!(
532                                                    key_chunk
533                                                        .ops()
534                                                        .iter()
535                                                        .all(|op| op == &Op::Insert)
536                                                );
537                                            }
538                                            refresh_args
539                                                .staging_table
540                                                .write_chunk(key_chunk.clone());
541                                            refresh_args.staging_table.try_flush().await?;
542                                        }
543
544                                        yield Message::Chunk(chunk);
545                                    }
546                                }
547                            }
548                            Message::Barrier(barrier) => {
549                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
550                                    barrier,
551                                    expect_next_state: Box::new(
552                                        MaterializeStreamState::NormalIngestion,
553                                    ),
554                                };
555                                continue 'main_loop;
556                            }
557                        }
558                    }
559
560                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
561                        anyhow::anyhow!(
562                            "Input stream terminated unexpectedly during normal ingestion"
563                        ),
564                    )));
565                }
566                MaterializeStreamState::MergingData => {
567                    let Some(refresh_args) = self.refresh_args.as_mut() else {
568                        panic!(
569                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
570                        );
571                    };
572                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
573
574                    debug_assert_eq!(
575                        self.state_table.vnodes(),
576                        refresh_args.staging_table.vnodes()
577                    );
578                    debug_assert_eq!(
579                        refresh_args.staging_table.vnodes(),
580                        refresh_args.progress_table.vnodes()
581                    );
582
583                    let mut rows_to_delete = vec![];
584                    let mut merge_complete = false;
585                    let mut pending_barrier: Option<Barrier> = None;
586
587                    // Scope to limit immutable borrows to state tables
588                    {
589                        let left_input = input.by_ref().map(Either::Left);
590                        let right_merge_sort = pin!(
591                            Self::make_mergesort_stream(
592                                &self.state_table,
593                                &refresh_args.staging_table,
594                                &mut refresh_args.progress_table
595                            )
596                            .map(Either::Right)
597                        );
598
599                        // Prefer to select input stream to handle barriers promptly
600                        // Rebuild the merge stream each time processing a barrier
601                        let mut merge_stream =
602                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
603                                stream::PollNext::Left
604                            });
605
606                        #[for_await]
607                        'merge_stream: for either in &mut merge_stream {
608                            match either {
609                                Either::Left(msg) => {
610                                    let msg = msg?;
611                                    match msg {
612                                        Message::Watermark(w) => yield Message::Watermark(w),
613                                        Message::Chunk(chunk) => {
614                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
615                                        }
616                                        Message::Barrier(b) => {
617                                            pending_barrier = Some(b);
618                                            break 'merge_stream;
619                                        }
620                                    }
621                                }
622                                Either::Right(result) => {
623                                    match result? {
624                                        Some((_vnode, row)) => {
625                                            rows_to_delete.push(row);
626                                        }
627                                        None => {
628                                            // Merge stream finished
629                                            merge_complete = true;
630
631                                            // If the merge stream finished, we need to wait for the next barrier to commit states
632                                        }
633                                    }
634                                }
635                            }
636                        }
637                    }
638
639                    // Process collected rows for deletion
640                    for row in &rows_to_delete {
641                        self.state_table.delete(row);
642                    }
643                    if !rows_to_delete.is_empty() {
644                        let to_delete_chunk = StreamChunk::from_rows(
645                            &rows_to_delete
646                                .iter()
647                                .map(|row| (Op::Delete, row))
648                                .collect_vec(),
649                            &self.schema.data_types(),
650                        );
651
652                        tracing::debug!(table_id = %refresh_args.table_id, "yielding to delete chunk: {}", to_delete_chunk.to_pretty());
653                        yield Message::Chunk(to_delete_chunk);
654                    }
655
656                    // should wait for at least one barrier
657                    assert!(pending_barrier.is_some(), "pending barrier is not set");
658
659                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
660                        barrier: pending_barrier.unwrap(),
661                        expect_next_state: if merge_complete {
662                            Box::new(MaterializeStreamState::CleanUp)
663                        } else {
664                            Box::new(MaterializeStreamState::MergingData)
665                        },
666                    };
667                    continue 'main_loop;
668                }
669                MaterializeStreamState::CleanUp => {
670                    let Some(refresh_args) = self.refresh_args.as_mut() else {
671                        panic!(
672                            "MaterializeExecutor entered MergingData state without refresh_args configured"
673                        );
674                    };
675                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
676
677                    #[for_await]
678                    for msg in input.by_ref() {
679                        let msg = msg?;
680                        match msg {
681                            Message::Watermark(w) => yield Message::Watermark(w),
682                            Message::Chunk(chunk) => {
683                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
684                            }
685                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
686                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
687                                    barrier,
688                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
689                                };
690                                continue 'main_loop;
691                            }
692                            Message::Barrier(barrier) => {
693                                let staging_table_id = refresh_args.staging_table.table_id();
694                                let epoch = barrier.epoch;
695                                self.local_barrier_manager.report_refresh_finished(
696                                    epoch,
697                                    self.actor_context.id,
698                                    refresh_args.table_id.into(),
699                                    staging_table_id,
700                                );
701                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
702
703                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
704                                    barrier,
705                                    expect_next_state: Box::new(
706                                        MaterializeStreamState::RefreshEnd {
707                                            on_complete_epoch: epoch,
708                                        },
709                                    ),
710                                };
711                                continue 'main_loop;
712                            }
713                        }
714                    }
715                }
716                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
717                    let Some(refresh_args) = self.refresh_args.as_mut() else {
718                        panic!(
719                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
720                        );
721                    };
722                    let staging_table_id = refresh_args.staging_table.table_id();
723
724                    // Wait for staging table truncation to complete
725                    let staging_store = refresh_args.staging_table.state_store().clone();
726                    staging_store
727                        .try_wait_epoch(
728                            HummockReadEpoch::Committed(on_complete_epoch.prev),
729                            TryWaitEpochOptions {
730                                table_id: staging_table_id.into(),
731                            },
732                        )
733                        .await?;
734
735                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
736
737                    if let Some(ref mut refresh_args) = self.refresh_args {
738                        refresh_args.is_refreshing = false;
739                    }
740                    *inner_state = MaterializeStreamState::NormalIngestion;
741                    continue 'main_loop;
742                }
743                MaterializeStreamState::CommitAndYieldBarrier {
744                    barrier,
745                    mut expect_next_state,
746                } => {
747                    if let Some(ref mut refresh_args) = self.refresh_args {
748                        match barrier.mutation.as_deref() {
749                            Some(Mutation::RefreshStart {
750                                table_id: refresh_table_id,
751                                associated_source_id: _,
752                            }) if *refresh_table_id == refresh_args.table_id => {
753                                debug_assert!(
754                                    !refresh_args.is_refreshing,
755                                    "cannot start refresh twice"
756                                );
757                                refresh_args.is_refreshing = true;
758                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
759
760                                // Initialize progress tracking for all VNodes
761                                Self::init_refresh_progress(
762                                    &self.state_table,
763                                    &mut refresh_args.progress_table,
764                                    barrier.epoch.curr,
765                                )?;
766                            }
767                            Some(Mutation::LoadFinish {
768                                associated_source_id: load_finish_source_id,
769                            }) => {
770                                // Get associated source id from table catalog
771                                let associated_source_id = match refresh_args
772                                    .table_catalog
773                                    .optional_associated_source_id
774                                {
775                                    Some(OptionalAssociatedSourceId::AssociatedSourceId(id)) => id,
776                                    None => unreachable!("associated_source_id is not set"),
777                                };
778
779                                if load_finish_source_id.table_id() == associated_source_id {
780                                    tracing::info!(
781                                        %load_finish_source_id,
782                                        "LoadFinish received, starting data replacement"
783                                    );
784                                    expect_next_state =
785                                        Box::new(MaterializeStreamState::<_>::MergingData);
786                                }
787                            }
788                            _ => {}
789                        }
790                    }
791
792                    // ===== normal operation =====
793
794                    // If a downstream mv depends on the current table, we need to do conflict check again.
795                    if !self.may_have_downstream
796                        && barrier.has_more_downstream_fragments(self.actor_context.id)
797                    {
798                        self.may_have_downstream = true;
799                    }
800                    Self::may_update_depended_subscriptions(
801                        &mut self.subscriber_ids,
802                        &barrier,
803                        mv_table_id,
804                    );
805                    let op_consistency_level = get_op_consistency_level(
806                        self.conflict_behavior,
807                        self.may_have_downstream,
808                        &self.subscriber_ids,
809                    );
810                    let post_commit = self
811                        .state_table
812                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
813                        .await?;
814                    if !post_commit.inner().is_consistent_op() {
815                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
816                    }
817
818                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
819
820                    // Commit staging table for refreshable materialized views
821                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
822                    {
823                        // Commit progress table for fault tolerance
824
825                        Some((
826                            refresh_args.staging_table.commit(barrier.epoch).await?,
827                            refresh_args.progress_table.commit(barrier.epoch).await?,
828                        ))
829                    } else {
830                        None
831                    };
832
833                    let b_epoch = barrier.epoch;
834                    yield Message::Barrier(barrier);
835
836                    // Update the vnode bitmap for the state table if asked.
837                    if let Some((_, cache_may_stale)) = post_commit
838                        .post_yield_barrier(update_vnode_bitmap.clone())
839                        .await?
840                        && cache_may_stale
841                    {
842                        self.materialize_cache.lru_cache.clear();
843                    }
844
845                    // Handle staging table post commit
846                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
847                        staging_post_commit
848                            .post_yield_barrier(update_vnode_bitmap.clone())
849                            .await?;
850                        progress_post_commit
851                            .post_yield_barrier(update_vnode_bitmap)
852                            .await?;
853                    }
854
855                    self.metrics
856                        .materialize_current_epoch
857                        .set(b_epoch.curr as i64);
858
859                    // ====== transition to next state ======
860
861                    *inner_state = *expect_next_state;
862                }
863            }
864        }
865    }
866
867    /// Stream that yields rows to be deleted from main table.
868    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
869    /// Yields `None` when finished processing all vnodes.
870    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
871    async fn make_mergesort_stream<'a>(
872        main_table: &'a StateTableInner<S, SD>,
873        staging_table: &'a StateTableInner<S, SD>,
874        progress_table: &'a mut RefreshProgressTable<S>,
875    ) {
876        for vnode in main_table.vnodes().clone().iter_vnodes() {
877            let mut processed_rows = 0;
878            // Check if this VNode has already been completed (for fault tolerance)
879            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
880                if let Some(current_entry) = progress_table.get_progress(vnode) {
881                    // Skip already completed VNodes during recovery
882                    if current_entry.is_completed {
883                        tracing::debug!(
884                            vnode = vnode.to_index(),
885                            "Skipping already completed VNode during recovery"
886                        );
887                        continue;
888                    }
889                    processed_rows += current_entry.processed_rows;
890                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
891
892                    if let Some(current_state) = &current_entry.current_pos {
893                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
894                    } else {
895                        (Bound::Unbounded, Bound::Unbounded)
896                    }
897                } else {
898                    (Bound::Unbounded, Bound::Unbounded)
899                };
900
901            let iter_main = main_table
902                .iter_keyed_row_with_vnode(
903                    vnode,
904                    &pk_range,
905                    PrefetchOptions::prefetch_for_large_range_scan(),
906                )
907                .await?;
908            let iter_staging = staging_table
909                .iter_keyed_row_with_vnode(
910                    vnode,
911                    &pk_range,
912                    PrefetchOptions::prefetch_for_large_range_scan(),
913                )
914                .await?;
915
916            pin_mut!(iter_main);
917            pin_mut!(iter_staging);
918
919            // Sort-merge join implementation using dual pointers
920            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
921            let mut staging_item: Option<KeyedRow<Bytes>> =
922                iter_staging.next().await.transpose()?;
923
924            while let Some(main_kv) = main_item {
925                let main_key = main_kv.key();
926
927                // Advance staging iterator until we find a key >= main_key
928                let mut should_delete = false;
929                while let Some(staging_kv) = &staging_item {
930                    let staging_key = staging_kv.key();
931                    match main_key.cmp(staging_key) {
932                        std::cmp::Ordering::Greater => {
933                            // main_key > staging_key, advance staging
934                            staging_item = iter_staging.next().await.transpose()?;
935                        }
936                        std::cmp::Ordering::Equal => {
937                            // Keys match, this row exists in both tables, no need to delete
938                            break;
939                        }
940                        std::cmp::Ordering::Less => {
941                            // main_key < staging_key, main row doesn't exist in staging, delete it
942                            should_delete = true;
943                            break;
944                        }
945                    }
946                }
947
948                // If staging_item is None, all remaining main rows should be deleted
949                if staging_item.is_none() {
950                    should_delete = true;
951                }
952
953                if should_delete {
954                    yield Some((vnode, main_kv.row().clone()));
955                }
956
957                // Advance main iterator
958                processed_rows += 1;
959                tracing::info!(
960                    "set progress table: vnode = {:?}, processed_rows = {:?}",
961                    vnode,
962                    processed_rows
963                );
964                progress_table.set_progress(
965                    vnode,
966                    Some(
967                        main_kv
968                            .row()
969                            .project(main_table.pk_indices())
970                            .to_owned_row(),
971                    ),
972                    false,
973                    processed_rows,
974                )?;
975                main_item = iter_main.next().await.transpose()?;
976            }
977
978            // Mark this VNode as completed
979            if let Some(current_entry) = progress_table.get_progress(vnode) {
980                progress_table.set_progress(
981                    vnode,
982                    current_entry.current_pos.clone(),
983                    true, // completed
984                    current_entry.processed_rows,
985                )?;
986
987                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
988            }
989        }
990
991        // Signal completion
992        yield None;
993    }
994
995    /// return true when changed
996    fn may_update_depended_subscriptions(
997        depended_subscriptions: &mut HashSet<u32>,
998        barrier: &Barrier,
999        mv_table_id: TableId,
1000    ) {
1001        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1002            if !depended_subscriptions.insert(subscriber_id) {
1003                warn!(
1004                    ?depended_subscriptions,
1005                    ?mv_table_id,
1006                    subscriber_id,
1007                    "subscription id already exists"
1008                );
1009            }
1010        }
1011
1012        if let Some(Mutation::DropSubscriptions {
1013            subscriptions_to_drop,
1014        }) = barrier.mutation.as_deref()
1015        {
1016            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
1017                if *upstream_mv_table_id == mv_table_id
1018                    && !depended_subscriptions.remove(subscriber_id)
1019                {
1020                    warn!(
1021                        ?depended_subscriptions,
1022                        ?mv_table_id,
1023                        subscriber_id,
1024                        "drop non existing subscriber_id id"
1025                    );
1026                }
1027            }
1028        }
1029    }
1030
1031    /// Initialize refresh progress tracking for all `VNodes`
1032    fn init_refresh_progress(
1033        state_table: &StateTableInner<S, SD>,
1034        progress_table: &mut RefreshProgressTable<S>,
1035        _epoch: u64,
1036    ) -> StreamExecutorResult<()> {
1037        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1038
1039        // Initialize progress for all VNodes in the current bitmap
1040        for vnode in state_table.vnodes().iter_vnodes() {
1041            progress_table.set_progress(
1042                vnode, None,  // initial position
1043                false, // not completed yet
1044                0,     // initial processed rows
1045            )?;
1046        }
1047
1048        tracing::info!(
1049            vnodes_count = state_table.vnodes().count_ones(),
1050            "Initialized refresh progress tracking for all VNodes"
1051        );
1052
1053        Ok(())
1054    }
1055}
1056
1057impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1058    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1059    #[cfg(any(test, feature = "test"))]
1060    pub async fn for_test(
1061        input: Executor,
1062        store: S,
1063        table_id: TableId,
1064        keys: Vec<ColumnOrder>,
1065        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1066        watermark_epoch: AtomicU64Ref,
1067        conflict_behavior: ConflictBehavior,
1068    ) -> Self {
1069        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1070        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1071        let schema = input.schema().clone();
1072        let columns: Vec<ColumnDesc> = column_ids
1073            .into_iter()
1074            .zip_eq_fast(schema.fields.iter())
1075            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1076            .collect_vec();
1077
1078        let row_serde = BasicSerde::new(
1079            Arc::from((0..columns.len()).collect_vec()),
1080            Arc::from(columns.clone().into_boxed_slice()),
1081        );
1082        let state_table = StateTableInner::from_table_catalog(
1083            &crate::common::table::test_utils::gen_pbtable(
1084                table_id,
1085                columns,
1086                arrange_order_types,
1087                arrange_columns.clone(),
1088                0,
1089            ),
1090            store,
1091            None,
1092        )
1093        .await;
1094
1095        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
1096
1097        Self {
1098            input,
1099            schema,
1100            state_table,
1101            arrange_key_indices: arrange_columns.clone(),
1102            actor_context: ActorContext::for_test(0),
1103            materialize_cache: MaterializeCache::new(
1104                watermark_epoch,
1105                MetricsInfo::for_test(),
1106                row_serde,
1107                vec![],
1108            ),
1109            conflict_behavior,
1110            version_column_indices: vec![],
1111            is_dummy_table: false,
1112            toastable_column_indices: None,
1113            may_have_downstream: true,
1114            subscriber_ids: HashSet::new(),
1115            metrics,
1116            refresh_args: None, // Test constructor doesn't support refresh functionality
1117            local_barrier_manager: LocalBarrierManager::for_test(),
1118        }
1119    }
1120}
1121
1122/// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
1123/// Optimized by checking length first to avoid expensive string comparison.
1124fn is_unavailable_value_str(s: &str) -> bool {
1125    s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
1126}
1127
1128/// Check if a datum represents Debezium's unavailable value placeholder.
1129/// This function handles both scalar types and one-dimensional arrays.
1130fn is_debezium_unavailable_value(
1131    datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
1132) -> bool {
1133    match datum {
1134        Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => is_unavailable_value_str(val),
1135        Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
1136            // For jsonb type, check if it's a string containing the unavailable value
1137            jsonb_ref
1138                .as_str()
1139                .map(is_unavailable_value_str)
1140                .unwrap_or(false)
1141        }
1142        Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
1143            // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
1144            // This is because when processing bytea from Debezium, we convert the base64-encoded string
1145            // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
1146            if let Ok(bytea_str) = std::str::from_utf8(bytea) {
1147                is_unavailable_value_str(bytea_str)
1148            } else {
1149                false
1150            }
1151        }
1152        Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
1153            // For list type, check if it contains exactly one element with the unavailable value
1154            // This is because when any element in an array triggers TOAST, Debezium treats the entire
1155            // array as unchanged and sends a placeholder array with only one element
1156            if list_ref.len() == 1 {
1157                if let Some(Some(element)) = list_ref.get(0) {
1158                    // Recursively check the array element
1159                    is_debezium_unavailable_value(&Some(element))
1160                } else {
1161                    false
1162                }
1163            } else {
1164                false
1165            }
1166        }
1167        _ => false,
1168    }
1169}
1170
1171/// Fix TOAST columns by replacing unavailable values with old row values.
1172fn handle_toast_columns_for_postgres_cdc(
1173    old_row: &OwnedRow,
1174    new_row: &OwnedRow,
1175    toastable_indices: &[usize],
1176) -> OwnedRow {
1177    let mut fixed_row_data = new_row.as_inner().to_vec();
1178
1179    for &toast_idx in toastable_indices {
1180        // Check if the new value is Debezium's unavailable value placeholder
1181        let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
1182        if is_unavailable {
1183            // Replace with old row value if available
1184            if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
1185                fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
1186            }
1187        }
1188    }
1189
1190    OwnedRow::new(fixed_row_data)
1191}
1192
1193type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
1194
1195impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1196    fn execute(self: Box<Self>) -> BoxedMessageStream {
1197        self.execute_inner().boxed()
1198    }
1199}
1200
1201impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1203        f.debug_struct("MaterializeExecutor")
1204            .field("arrange_key_indices", &self.arrange_key_indices)
1205            .finish()
1206    }
1207}
1208
1209/// A cache for materialize executors.
1210struct MaterializeCache<SD> {
1211    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
1212    row_serde: BasicSerde,
1213    version_column_indices: Vec<u32>,
1214    _serde: PhantomData<SD>,
1215}
1216
1217type CacheValue = Option<CompactedRow>;
1218
1219impl<SD: ValueRowSerde> MaterializeCache<SD> {
1220    fn new(
1221        watermark_sequence: AtomicU64Ref,
1222        metrics_info: MetricsInfo,
1223        row_serde: BasicSerde,
1224        version_column_indices: Vec<u32>,
1225    ) -> Self {
1226        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
1227            ManagedLruCache::unbounded(watermark_sequence, metrics_info);
1228        Self {
1229            lru_cache,
1230            row_serde,
1231            version_column_indices,
1232            _serde: PhantomData,
1233        }
1234    }
1235
1236    /// First populate the cache from `table`, and then calculate a [`ChangeBuffer`].
1237    /// `table` will not be written in this method.
1238    async fn handle<S: StateStore>(
1239        &mut self,
1240        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
1241        table: &StateTableInner<S, SD>,
1242        conflict_behavior: ConflictBehavior,
1243        metrics: &MaterializeMetrics,
1244        toastable_column_indices: Option<&[usize]>,
1245    ) -> StreamExecutorResult<ChangeBuffer> {
1246        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
1247
1248        let key_set: HashSet<Box<[u8]>> = row_ops
1249            .iter()
1250            .map(|(_, k, _)| k.as_slice().into())
1251            .collect();
1252
1253        // Populate the LRU cache with the keys in input chunk.
1254        // For new keys, row values are set to None.
1255        self.fetch_keys(
1256            key_set.iter().map(|v| v.deref()),
1257            table,
1258            conflict_behavior,
1259            metrics,
1260        )
1261        .await?;
1262
1263        let mut change_buffer = ChangeBuffer::new();
1264        let row_serde = self.row_serde.clone();
1265        let version_column_indices = self.version_column_indices.clone();
1266        for (op, key, row) in row_ops {
1267            match op {
1268                Op::Insert | Op::UpdateInsert => {
1269                    let Some(old_row) = self.get_expected(&key) else {
1270                        // not exists before, meaning no conflict, simply insert
1271                        let new_row_deserialized =
1272                            row_serde.deserializer.deserialize(row.clone())?;
1273                        change_buffer.insert(key.clone(), new_row_deserialized);
1274                        self.lru_cache.put(key, Some(CompactedRow { row }));
1275                        continue;
1276                    };
1277
1278                    // now conflict happens, handle it according to the specified behavior
1279                    match conflict_behavior {
1280                        ConflictBehavior::Overwrite => {
1281                            let old_row_deserialized =
1282                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1283                            let new_row_deserialized =
1284                                row_serde.deserializer.deserialize(row.clone())?;
1285
1286                            let need_overwrite = if !version_column_indices.is_empty() {
1287                                versions_are_newer_or_equal(
1288                                    &old_row_deserialized,
1289                                    &new_row_deserialized,
1290                                    &version_column_indices,
1291                                )
1292                            } else {
1293                                // no version column specified, just overwrite
1294                                true
1295                            };
1296
1297                            if need_overwrite {
1298                                if let Some(toastable_indices) = toastable_column_indices {
1299                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
1300                                    let final_row = handle_toast_columns_for_postgres_cdc(
1301                                        &old_row_deserialized,
1302                                        &new_row_deserialized,
1303                                        toastable_indices,
1304                                    );
1305
1306                                    change_buffer.update(
1307                                        key.clone(),
1308                                        old_row_deserialized,
1309                                        final_row.clone(),
1310                                    );
1311                                    let final_row_bytes =
1312                                        Bytes::from(row_serde.serializer.serialize(final_row));
1313                                    self.lru_cache.put(
1314                                        key.clone(),
1315                                        Some(CompactedRow {
1316                                            row: final_row_bytes,
1317                                        }),
1318                                    );
1319                                } else {
1320                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
1321                                    change_buffer.update(
1322                                        key.clone(),
1323                                        old_row_deserialized,
1324                                        new_row_deserialized,
1325                                    );
1326                                    self.lru_cache
1327                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
1328                                }
1329                            };
1330                        }
1331                        ConflictBehavior::IgnoreConflict => {
1332                            // ignore conflict, do nothing
1333                        }
1334                        ConflictBehavior::DoUpdateIfNotNull => {
1335                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
1336
1337                            let old_row_deserialized =
1338                                row_serde.deserializer.deserialize(old_row.row.clone())?;
1339                            let new_row_deserialized =
1340                                row_serde.deserializer.deserialize(row.clone())?;
1341                            let need_overwrite = if !version_column_indices.is_empty() {
1342                                versions_are_newer_or_equal(
1343                                    &old_row_deserialized,
1344                                    &new_row_deserialized,
1345                                    &version_column_indices,
1346                                )
1347                            } else {
1348                                true
1349                            };
1350
1351                            if need_overwrite {
1352                                let mut row_deserialized_vec =
1353                                    old_row_deserialized.clone().into_inner().into_vec();
1354                                replace_if_not_null(
1355                                    &mut row_deserialized_vec,
1356                                    new_row_deserialized.clone(),
1357                                );
1358                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
1359
1360                                // Apply TOAST column fix for CDC tables with TOAST columns
1361                                if let Some(toastable_indices) = toastable_column_indices {
1362                                    // Note: we need to use old_row_deserialized again, but it was moved above
1363                                    // So we re-deserialize the old row
1364                                    let old_row_deserialized_again =
1365                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
1366                                    updated_row = handle_toast_columns_for_postgres_cdc(
1367                                        &old_row_deserialized_again,
1368                                        &updated_row,
1369                                        toastable_indices,
1370                                    );
1371                                }
1372
1373                                change_buffer.update(
1374                                    key.clone(),
1375                                    old_row_deserialized,
1376                                    updated_row.clone(),
1377                                );
1378                                let updated_row_bytes =
1379                                    Bytes::from(row_serde.serializer.serialize(updated_row));
1380                                self.lru_cache.put(
1381                                    key.clone(),
1382                                    Some(CompactedRow {
1383                                        row: updated_row_bytes,
1384                                    }),
1385                                );
1386                            }
1387                        }
1388                        _ => unreachable!(),
1389                    };
1390                }
1391
1392                Op::Delete | Op::UpdateDelete => {
1393                    match conflict_behavior {
1394                        checked_conflict_behaviors!() => {
1395                            if let Some(old_row) = self.get_expected(&key) {
1396                                let old_row_deserialized =
1397                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
1398                                change_buffer.delete(key.clone(), old_row_deserialized);
1399                                // put a None into the cache to represent deletion
1400                                self.lru_cache.put(key, None);
1401                            } else {
1402                                // delete a non-existent value
1403                                // this is allowed in the case of mview conflict, so ignore
1404                            };
1405                        }
1406                        _ => unreachable!(),
1407                    };
1408                }
1409            }
1410        }
1411        Ok(change_buffer)
1412    }
1413
1414    async fn fetch_keys<'a, S: StateStore>(
1415        &mut self,
1416        keys: impl Iterator<Item = &'a [u8]>,
1417        table: &StateTableInner<S, SD>,
1418        conflict_behavior: ConflictBehavior,
1419        metrics: &MaterializeMetrics,
1420    ) -> StreamExecutorResult<()> {
1421        let mut futures = vec![];
1422        for key in keys {
1423            metrics.materialize_cache_total_count.inc();
1424
1425            if self.lru_cache.contains(key) {
1426                if self.lru_cache.get(key).unwrap().is_some() {
1427                    metrics.materialize_data_exist_count.inc();
1428                }
1429                metrics.materialize_cache_hit_count.inc();
1430                continue;
1431            }
1432            futures.push(async {
1433                let key_row = table.pk_serde().deserialize(key).unwrap();
1434                let row = table.get_row(key_row).await?.map(CompactedRow::from);
1435                StreamExecutorResult::Ok((key.to_vec(), row))
1436            });
1437        }
1438
1439        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
1440        while let Some(result) = buffered.next().await {
1441            let (key, row) = result?;
1442            if row.is_some() {
1443                metrics.materialize_data_exist_count.inc();
1444            }
1445            // for keys that are not in the table, `value` is None
1446            match conflict_behavior {
1447                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
1448                _ => unreachable!(),
1449            };
1450        }
1451
1452        Ok(())
1453    }
1454
1455    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
1456        self.lru_cache.get(key).unwrap_or_else(|| {
1457            panic!(
1458                "the key {:?} has not been fetched in the materialize executor's cache ",
1459                key
1460            )
1461        })
1462    }
1463
1464    fn evict(&mut self) {
1465        self.lru_cache.evict()
1466    }
1467}
1468
1469/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
1470/// column value in the replacement row is not null.
1471///
1472/// # Example
1473///
1474/// ```ignore
1475/// let mut row = vec![Some(1), None, Some(3)];
1476/// let replacement = vec![Some(10), Some(20), None];
1477/// replace_if_not_null(&mut row, replacement);
1478/// ```
1479///
1480/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
1481fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
1482    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
1483        if let Some(new_value) = new_col {
1484            *old_col = Some(new_value);
1485        }
1486    }
1487}
1488
1489/// Compare multiple version columns lexicographically.
1490/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
1491fn versions_are_newer_or_equal(
1492    old_row: &OwnedRow,
1493    new_row: &OwnedRow,
1494    version_column_indices: &[u32],
1495) -> bool {
1496    if version_column_indices.is_empty() {
1497        // No version columns specified, always consider new version as newer
1498        return true;
1499    }
1500
1501    for &idx in version_column_indices {
1502        let old_value = old_row.index(idx as usize);
1503        let new_value = new_row.index(idx as usize);
1504
1505        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
1506            std::cmp::Ordering::Less => return true,     // new is newer
1507            std::cmp::Ordering::Greater => return false, // old is newer
1508            std::cmp::Ordering::Equal => continue,       // equal, check next column
1509        }
1510    }
1511
1512    // All version columns are equal, consider new version as equal (should overwrite)
1513    true
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518
1519    use std::iter;
1520    use std::sync::atomic::AtomicU64;
1521
1522    use rand::rngs::SmallRng;
1523    use rand::{Rng, RngCore, SeedableRng};
1524    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1525    use risingwave_common::catalog::Field;
1526    use risingwave_common::util::epoch::test_epoch;
1527    use risingwave_common::util::sort_util::OrderType;
1528    use risingwave_hummock_sdk::HummockReadEpoch;
1529    use risingwave_storage::memory::MemoryStateStore;
1530    use risingwave_storage::table::batch_table::BatchTable;
1531
1532    use super::*;
1533    use crate::executor::test_utils::*;
1534
1535    #[tokio::test]
1536    async fn test_materialize_executor() {
1537        // Prepare storage and memtable.
1538        let memory_state_store = MemoryStateStore::new();
1539        let table_id = TableId::new(1);
1540        // Two columns of int32 type, the first column is PK.
1541        let schema = Schema::new(vec![
1542            Field::unnamed(DataType::Int32),
1543            Field::unnamed(DataType::Int32),
1544        ]);
1545        let column_ids = vec![0.into(), 1.into()];
1546
1547        // Prepare source chunks.
1548        let chunk1 = StreamChunk::from_pretty(
1549            " i i
1550            + 1 4
1551            + 2 5
1552            + 3 6",
1553        );
1554        let chunk2 = StreamChunk::from_pretty(
1555            " i i
1556            + 7 8
1557            - 3 6",
1558        );
1559
1560        // Prepare stream executors.
1561        let source = MockSource::with_messages(vec![
1562            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1563            Message::Chunk(chunk1),
1564            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1565            Message::Chunk(chunk2),
1566            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1567        ])
1568        .into_executor(schema.clone(), PkIndices::new());
1569
1570        let order_types = vec![OrderType::ascending()];
1571        let column_descs = vec![
1572            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1573            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1574        ];
1575
1576        let table = BatchTable::for_test(
1577            memory_state_store.clone(),
1578            table_id,
1579            column_descs,
1580            order_types,
1581            vec![0],
1582            vec![0, 1],
1583        );
1584
1585        let mut materialize_executor = MaterializeExecutor::for_test(
1586            source,
1587            memory_state_store,
1588            table_id,
1589            vec![ColumnOrder::new(0, OrderType::ascending())],
1590            column_ids,
1591            Arc::new(AtomicU64::new(0)),
1592            ConflictBehavior::NoCheck,
1593        )
1594        .await
1595        .boxed()
1596        .execute();
1597        materialize_executor.next().await.transpose().unwrap();
1598
1599        materialize_executor.next().await.transpose().unwrap();
1600
1601        // First stream chunk. We check the existence of (3) -> (3,6)
1602        match materialize_executor.next().await.transpose().unwrap() {
1603            Some(Message::Barrier(_)) => {
1604                let row = table
1605                    .get_row(
1606                        &OwnedRow::new(vec![Some(3_i32.into())]),
1607                        HummockReadEpoch::NoWait(u64::MAX),
1608                    )
1609                    .await
1610                    .unwrap();
1611                assert_eq!(
1612                    row,
1613                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1614                );
1615            }
1616            _ => unreachable!(),
1617        }
1618        materialize_executor.next().await.transpose().unwrap();
1619        // Second stream chunk. We check the existence of (7) -> (7,8)
1620        match materialize_executor.next().await.transpose().unwrap() {
1621            Some(Message::Barrier(_)) => {
1622                let row = table
1623                    .get_row(
1624                        &OwnedRow::new(vec![Some(7_i32.into())]),
1625                        HummockReadEpoch::NoWait(u64::MAX),
1626                    )
1627                    .await
1628                    .unwrap();
1629                assert_eq!(
1630                    row,
1631                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1632                );
1633            }
1634            _ => unreachable!(),
1635        }
1636    }
1637
1638    // https://github.com/risingwavelabs/risingwave/issues/13346
1639    #[tokio::test]
1640    async fn test_upsert_stream() {
1641        // Prepare storage and memtable.
1642        let memory_state_store = MemoryStateStore::new();
1643        let table_id = TableId::new(1);
1644        // Two columns of int32 type, the first column is PK.
1645        let schema = Schema::new(vec![
1646            Field::unnamed(DataType::Int32),
1647            Field::unnamed(DataType::Int32),
1648        ]);
1649        let column_ids = vec![0.into(), 1.into()];
1650
1651        // test double insert one pk, the latter needs to override the former.
1652        let chunk1 = StreamChunk::from_pretty(
1653            " i i
1654            + 1 1",
1655        );
1656
1657        let chunk2 = StreamChunk::from_pretty(
1658            " i i
1659            + 1 2
1660            - 1 2",
1661        );
1662
1663        // Prepare stream executors.
1664        let source = MockSource::with_messages(vec![
1665            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1666            Message::Chunk(chunk1),
1667            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1668            Message::Chunk(chunk2),
1669            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1670        ])
1671        .into_executor(schema.clone(), PkIndices::new());
1672
1673        let order_types = vec![OrderType::ascending()];
1674        let column_descs = vec![
1675            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1676            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1677        ];
1678
1679        let table = BatchTable::for_test(
1680            memory_state_store.clone(),
1681            table_id,
1682            column_descs,
1683            order_types,
1684            vec![0],
1685            vec![0, 1],
1686        );
1687
1688        let mut materialize_executor = MaterializeExecutor::for_test(
1689            source,
1690            memory_state_store,
1691            table_id,
1692            vec![ColumnOrder::new(0, OrderType::ascending())],
1693            column_ids,
1694            Arc::new(AtomicU64::new(0)),
1695            ConflictBehavior::Overwrite,
1696        )
1697        .await
1698        .boxed()
1699        .execute();
1700        materialize_executor.next().await.transpose().unwrap();
1701
1702        materialize_executor.next().await.transpose().unwrap();
1703        materialize_executor.next().await.transpose().unwrap();
1704        materialize_executor.next().await.transpose().unwrap();
1705
1706        match materialize_executor.next().await.transpose().unwrap() {
1707            Some(Message::Barrier(_)) => {
1708                let row = table
1709                    .get_row(
1710                        &OwnedRow::new(vec![Some(1_i32.into())]),
1711                        HummockReadEpoch::NoWait(u64::MAX),
1712                    )
1713                    .await
1714                    .unwrap();
1715                assert!(row.is_none());
1716            }
1717            _ => unreachable!(),
1718        }
1719    }
1720
1721    #[tokio::test]
1722    async fn test_check_insert_conflict() {
1723        // Prepare storage and memtable.
1724        let memory_state_store = MemoryStateStore::new();
1725        let table_id = TableId::new(1);
1726        // Two columns of int32 type, the first column is PK.
1727        let schema = Schema::new(vec![
1728            Field::unnamed(DataType::Int32),
1729            Field::unnamed(DataType::Int32),
1730        ]);
1731        let column_ids = vec![0.into(), 1.into()];
1732
1733        // test double insert one pk, the latter needs to override the former.
1734        let chunk1 = StreamChunk::from_pretty(
1735            " i i
1736            + 1 3
1737            + 1 4
1738            + 2 5
1739            + 3 6",
1740        );
1741
1742        let chunk2 = StreamChunk::from_pretty(
1743            " i i
1744            + 1 3
1745            + 2 6",
1746        );
1747
1748        // test delete wrong value, delete inexistent pk
1749        let chunk3 = StreamChunk::from_pretty(
1750            " i i
1751            + 1 4",
1752        );
1753
1754        // Prepare stream executors.
1755        let source = MockSource::with_messages(vec![
1756            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1757            Message::Chunk(chunk1),
1758            Message::Chunk(chunk2),
1759            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1760            Message::Chunk(chunk3),
1761            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1762        ])
1763        .into_executor(schema.clone(), PkIndices::new());
1764
1765        let order_types = vec![OrderType::ascending()];
1766        let column_descs = vec![
1767            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1768            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1769        ];
1770
1771        let table = BatchTable::for_test(
1772            memory_state_store.clone(),
1773            table_id,
1774            column_descs,
1775            order_types,
1776            vec![0],
1777            vec![0, 1],
1778        );
1779
1780        let mut materialize_executor = MaterializeExecutor::for_test(
1781            source,
1782            memory_state_store,
1783            table_id,
1784            vec![ColumnOrder::new(0, OrderType::ascending())],
1785            column_ids,
1786            Arc::new(AtomicU64::new(0)),
1787            ConflictBehavior::Overwrite,
1788        )
1789        .await
1790        .boxed()
1791        .execute();
1792        materialize_executor.next().await.transpose().unwrap();
1793
1794        materialize_executor.next().await.transpose().unwrap();
1795        materialize_executor.next().await.transpose().unwrap();
1796
1797        // First stream chunk. We check the existence of (3) -> (3,6)
1798        match materialize_executor.next().await.transpose().unwrap() {
1799            Some(Message::Barrier(_)) => {
1800                let row = table
1801                    .get_row(
1802                        &OwnedRow::new(vec![Some(3_i32.into())]),
1803                        HummockReadEpoch::NoWait(u64::MAX),
1804                    )
1805                    .await
1806                    .unwrap();
1807                assert_eq!(
1808                    row,
1809                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1810                );
1811
1812                let row = table
1813                    .get_row(
1814                        &OwnedRow::new(vec![Some(1_i32.into())]),
1815                        HummockReadEpoch::NoWait(u64::MAX),
1816                    )
1817                    .await
1818                    .unwrap();
1819                assert_eq!(
1820                    row,
1821                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1822                );
1823
1824                let row = table
1825                    .get_row(
1826                        &OwnedRow::new(vec![Some(2_i32.into())]),
1827                        HummockReadEpoch::NoWait(u64::MAX),
1828                    )
1829                    .await
1830                    .unwrap();
1831                assert_eq!(
1832                    row,
1833                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1834                );
1835            }
1836            _ => unreachable!(),
1837        }
1838    }
1839
1840    #[tokio::test]
1841    async fn test_delete_and_update_conflict() {
1842        // Prepare storage and memtable.
1843        let memory_state_store = MemoryStateStore::new();
1844        let table_id = TableId::new(1);
1845        // Two columns of int32 type, the first column is PK.
1846        let schema = Schema::new(vec![
1847            Field::unnamed(DataType::Int32),
1848            Field::unnamed(DataType::Int32),
1849        ]);
1850        let column_ids = vec![0.into(), 1.into()];
1851
1852        // test double insert one pk, the latter needs to override the former.
1853        let chunk1 = StreamChunk::from_pretty(
1854            " i i
1855            + 1 4
1856            + 2 5
1857            + 3 6
1858            U- 8 1
1859            U+ 8 2
1860            + 8 3",
1861        );
1862
1863        // test delete wrong value, delete inexistent pk
1864        let chunk2 = StreamChunk::from_pretty(
1865            " i i
1866            + 7 8
1867            - 3 4
1868            - 5 0",
1869        );
1870
1871        // test delete wrong value, delete inexistent pk
1872        let chunk3 = StreamChunk::from_pretty(
1873            " i i
1874            + 1 5
1875            U- 2 4
1876            U+ 2 8
1877            U- 9 0
1878            U+ 9 1",
1879        );
1880
1881        // Prepare stream executors.
1882        let source = MockSource::with_messages(vec![
1883            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1884            Message::Chunk(chunk1),
1885            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1886            Message::Chunk(chunk2),
1887            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1888            Message::Chunk(chunk3),
1889            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1890        ])
1891        .into_executor(schema.clone(), PkIndices::new());
1892
1893        let order_types = vec![OrderType::ascending()];
1894        let column_descs = vec![
1895            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1896            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1897        ];
1898
1899        let table = BatchTable::for_test(
1900            memory_state_store.clone(),
1901            table_id,
1902            column_descs,
1903            order_types,
1904            vec![0],
1905            vec![0, 1],
1906        );
1907
1908        let mut materialize_executor = MaterializeExecutor::for_test(
1909            source,
1910            memory_state_store,
1911            table_id,
1912            vec![ColumnOrder::new(0, OrderType::ascending())],
1913            column_ids,
1914            Arc::new(AtomicU64::new(0)),
1915            ConflictBehavior::Overwrite,
1916        )
1917        .await
1918        .boxed()
1919        .execute();
1920        materialize_executor.next().await.transpose().unwrap();
1921
1922        materialize_executor.next().await.transpose().unwrap();
1923
1924        // First stream chunk. We check the existence of (3) -> (3,6)
1925        match materialize_executor.next().await.transpose().unwrap() {
1926            Some(Message::Barrier(_)) => {
1927                // can read (8, 3), check insert after update
1928                let row = table
1929                    .get_row(
1930                        &OwnedRow::new(vec![Some(8_i32.into())]),
1931                        HummockReadEpoch::NoWait(u64::MAX),
1932                    )
1933                    .await
1934                    .unwrap();
1935                assert_eq!(
1936                    row,
1937                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1938                );
1939            }
1940            _ => unreachable!(),
1941        }
1942        materialize_executor.next().await.transpose().unwrap();
1943
1944        match materialize_executor.next().await.transpose().unwrap() {
1945            Some(Message::Barrier(_)) => {
1946                let row = table
1947                    .get_row(
1948                        &OwnedRow::new(vec![Some(7_i32.into())]),
1949                        HummockReadEpoch::NoWait(u64::MAX),
1950                    )
1951                    .await
1952                    .unwrap();
1953                assert_eq!(
1954                    row,
1955                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1956                );
1957
1958                // check delete wrong value
1959                let row = table
1960                    .get_row(
1961                        &OwnedRow::new(vec![Some(3_i32.into())]),
1962                        HummockReadEpoch::NoWait(u64::MAX),
1963                    )
1964                    .await
1965                    .unwrap();
1966                assert_eq!(row, None);
1967
1968                // check delete wrong pk
1969                let row = table
1970                    .get_row(
1971                        &OwnedRow::new(vec![Some(5_i32.into())]),
1972                        HummockReadEpoch::NoWait(u64::MAX),
1973                    )
1974                    .await
1975                    .unwrap();
1976                assert_eq!(row, None);
1977            }
1978            _ => unreachable!(),
1979        }
1980
1981        materialize_executor.next().await.transpose().unwrap();
1982        // Second stream chunk. We check the existence of (7) -> (7,8)
1983        match materialize_executor.next().await.transpose().unwrap() {
1984            Some(Message::Barrier(_)) => {
1985                let row = table
1986                    .get_row(
1987                        &OwnedRow::new(vec![Some(1_i32.into())]),
1988                        HummockReadEpoch::NoWait(u64::MAX),
1989                    )
1990                    .await
1991                    .unwrap();
1992                assert_eq!(
1993                    row,
1994                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1995                );
1996
1997                // check update wrong value
1998                let row = table
1999                    .get_row(
2000                        &OwnedRow::new(vec![Some(2_i32.into())]),
2001                        HummockReadEpoch::NoWait(u64::MAX),
2002                    )
2003                    .await
2004                    .unwrap();
2005                assert_eq!(
2006                    row,
2007                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2008                );
2009
2010                // check update wrong pk, should become insert
2011                let row = table
2012                    .get_row(
2013                        &OwnedRow::new(vec![Some(9_i32.into())]),
2014                        HummockReadEpoch::NoWait(u64::MAX),
2015                    )
2016                    .await
2017                    .unwrap();
2018                assert_eq!(
2019                    row,
2020                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2021                );
2022            }
2023            _ => unreachable!(),
2024        }
2025    }
2026
2027    #[tokio::test]
2028    async fn test_ignore_insert_conflict() {
2029        // Prepare storage and memtable.
2030        let memory_state_store = MemoryStateStore::new();
2031        let table_id = TableId::new(1);
2032        // Two columns of int32 type, the first column is PK.
2033        let schema = Schema::new(vec![
2034            Field::unnamed(DataType::Int32),
2035            Field::unnamed(DataType::Int32),
2036        ]);
2037        let column_ids = vec![0.into(), 1.into()];
2038
2039        // test double insert one pk, the latter needs to be ignored.
2040        let chunk1 = StreamChunk::from_pretty(
2041            " i i
2042            + 1 3
2043            + 1 4
2044            + 2 5
2045            + 3 6",
2046        );
2047
2048        let chunk2 = StreamChunk::from_pretty(
2049            " i i
2050            + 1 5
2051            + 2 6",
2052        );
2053
2054        // test delete wrong value, delete inexistent pk
2055        let chunk3 = StreamChunk::from_pretty(
2056            " i i
2057            + 1 6",
2058        );
2059
2060        // Prepare stream executors.
2061        let source = MockSource::with_messages(vec![
2062            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2063            Message::Chunk(chunk1),
2064            Message::Chunk(chunk2),
2065            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2066            Message::Chunk(chunk3),
2067            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2068        ])
2069        .into_executor(schema.clone(), PkIndices::new());
2070
2071        let order_types = vec![OrderType::ascending()];
2072        let column_descs = vec![
2073            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2074            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2075        ];
2076
2077        let table = BatchTable::for_test(
2078            memory_state_store.clone(),
2079            table_id,
2080            column_descs,
2081            order_types,
2082            vec![0],
2083            vec![0, 1],
2084        );
2085
2086        let mut materialize_executor = MaterializeExecutor::for_test(
2087            source,
2088            memory_state_store,
2089            table_id,
2090            vec![ColumnOrder::new(0, OrderType::ascending())],
2091            column_ids,
2092            Arc::new(AtomicU64::new(0)),
2093            ConflictBehavior::IgnoreConflict,
2094        )
2095        .await
2096        .boxed()
2097        .execute();
2098        materialize_executor.next().await.transpose().unwrap();
2099
2100        materialize_executor.next().await.transpose().unwrap();
2101        materialize_executor.next().await.transpose().unwrap();
2102
2103        // First stream chunk. We check the existence of (3) -> (3,6)
2104        match materialize_executor.next().await.transpose().unwrap() {
2105            Some(Message::Barrier(_)) => {
2106                let row = table
2107                    .get_row(
2108                        &OwnedRow::new(vec![Some(3_i32.into())]),
2109                        HummockReadEpoch::NoWait(u64::MAX),
2110                    )
2111                    .await
2112                    .unwrap();
2113                assert_eq!(
2114                    row,
2115                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
2116                );
2117
2118                let row = table
2119                    .get_row(
2120                        &OwnedRow::new(vec![Some(1_i32.into())]),
2121                        HummockReadEpoch::NoWait(u64::MAX),
2122                    )
2123                    .await
2124                    .unwrap();
2125                assert_eq!(
2126                    row,
2127                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
2128                );
2129
2130                let row = table
2131                    .get_row(
2132                        &OwnedRow::new(vec![Some(2_i32.into())]),
2133                        HummockReadEpoch::NoWait(u64::MAX),
2134                    )
2135                    .await
2136                    .unwrap();
2137                assert_eq!(
2138                    row,
2139                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
2140                );
2141            }
2142            _ => unreachable!(),
2143        }
2144    }
2145
2146    #[tokio::test]
2147    async fn test_ignore_delete_then_insert() {
2148        // Prepare storage and memtable.
2149        let memory_state_store = MemoryStateStore::new();
2150        let table_id = TableId::new(1);
2151        // Two columns of int32 type, the first column is PK.
2152        let schema = Schema::new(vec![
2153            Field::unnamed(DataType::Int32),
2154            Field::unnamed(DataType::Int32),
2155        ]);
2156        let column_ids = vec![0.into(), 1.into()];
2157
2158        // test insert after delete one pk, the latter insert should succeed.
2159        let chunk1 = StreamChunk::from_pretty(
2160            " i i
2161            + 1 3
2162            - 1 3
2163            + 1 6",
2164        );
2165
2166        // Prepare stream executors.
2167        let source = MockSource::with_messages(vec![
2168            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2169            Message::Chunk(chunk1),
2170            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2171        ])
2172        .into_executor(schema.clone(), PkIndices::new());
2173
2174        let order_types = vec![OrderType::ascending()];
2175        let column_descs = vec![
2176            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2177            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2178        ];
2179
2180        let table = BatchTable::for_test(
2181            memory_state_store.clone(),
2182            table_id,
2183            column_descs,
2184            order_types,
2185            vec![0],
2186            vec![0, 1],
2187        );
2188
2189        let mut materialize_executor = MaterializeExecutor::for_test(
2190            source,
2191            memory_state_store,
2192            table_id,
2193            vec![ColumnOrder::new(0, OrderType::ascending())],
2194            column_ids,
2195            Arc::new(AtomicU64::new(0)),
2196            ConflictBehavior::IgnoreConflict,
2197        )
2198        .await
2199        .boxed()
2200        .execute();
2201        let _msg1 = materialize_executor
2202            .next()
2203            .await
2204            .transpose()
2205            .unwrap()
2206            .unwrap()
2207            .as_barrier()
2208            .unwrap();
2209        let _msg2 = materialize_executor
2210            .next()
2211            .await
2212            .transpose()
2213            .unwrap()
2214            .unwrap()
2215            .as_chunk()
2216            .unwrap();
2217        let _msg3 = materialize_executor
2218            .next()
2219            .await
2220            .transpose()
2221            .unwrap()
2222            .unwrap()
2223            .as_barrier()
2224            .unwrap();
2225
2226        let row = table
2227            .get_row(
2228                &OwnedRow::new(vec![Some(1_i32.into())]),
2229                HummockReadEpoch::NoWait(u64::MAX),
2230            )
2231            .await
2232            .unwrap();
2233        assert_eq!(
2234            row,
2235            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2236        );
2237    }
2238
2239    #[tokio::test]
2240    async fn test_ignore_delete_and_update_conflict() {
2241        // Prepare storage and memtable.
2242        let memory_state_store = MemoryStateStore::new();
2243        let table_id = TableId::new(1);
2244        // Two columns of int32 type, the first column is PK.
2245        let schema = Schema::new(vec![
2246            Field::unnamed(DataType::Int32),
2247            Field::unnamed(DataType::Int32),
2248        ]);
2249        let column_ids = vec![0.into(), 1.into()];
2250
2251        // test double insert one pk, the latter should be ignored.
2252        let chunk1 = StreamChunk::from_pretty(
2253            " i i
2254            + 1 4
2255            + 2 5
2256            + 3 6
2257            U- 8 1
2258            U+ 8 2
2259            + 8 3",
2260        );
2261
2262        // test delete wrong value, delete inexistent pk
2263        let chunk2 = StreamChunk::from_pretty(
2264            " i i
2265            + 7 8
2266            - 3 4
2267            - 5 0",
2268        );
2269
2270        // test delete wrong value, delete inexistent pk
2271        let chunk3 = StreamChunk::from_pretty(
2272            " i i
2273            + 1 5
2274            U- 2 4
2275            U+ 2 8
2276            U- 9 0
2277            U+ 9 1",
2278        );
2279
2280        // Prepare stream executors.
2281        let source = MockSource::with_messages(vec![
2282            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2283            Message::Chunk(chunk1),
2284            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2285            Message::Chunk(chunk2),
2286            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2287            Message::Chunk(chunk3),
2288            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2289        ])
2290        .into_executor(schema.clone(), PkIndices::new());
2291
2292        let order_types = vec![OrderType::ascending()];
2293        let column_descs = vec![
2294            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2295            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2296        ];
2297
2298        let table = BatchTable::for_test(
2299            memory_state_store.clone(),
2300            table_id,
2301            column_descs,
2302            order_types,
2303            vec![0],
2304            vec![0, 1],
2305        );
2306
2307        let mut materialize_executor = MaterializeExecutor::for_test(
2308            source,
2309            memory_state_store,
2310            table_id,
2311            vec![ColumnOrder::new(0, OrderType::ascending())],
2312            column_ids,
2313            Arc::new(AtomicU64::new(0)),
2314            ConflictBehavior::IgnoreConflict,
2315        )
2316        .await
2317        .boxed()
2318        .execute();
2319        materialize_executor.next().await.transpose().unwrap();
2320
2321        materialize_executor.next().await.transpose().unwrap();
2322
2323        // First stream chunk. We check the existence of (3) -> (3,6)
2324        match materialize_executor.next().await.transpose().unwrap() {
2325            Some(Message::Barrier(_)) => {
2326                // can read (8, 2), check insert after update
2327                let row = table
2328                    .get_row(
2329                        &OwnedRow::new(vec![Some(8_i32.into())]),
2330                        HummockReadEpoch::NoWait(u64::MAX),
2331                    )
2332                    .await
2333                    .unwrap();
2334                assert_eq!(
2335                    row,
2336                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2337                );
2338            }
2339            _ => unreachable!(),
2340        }
2341        materialize_executor.next().await.transpose().unwrap();
2342
2343        match materialize_executor.next().await.transpose().unwrap() {
2344            Some(Message::Barrier(_)) => {
2345                let row = table
2346                    .get_row(
2347                        &OwnedRow::new(vec![Some(7_i32.into())]),
2348                        HummockReadEpoch::NoWait(u64::MAX),
2349                    )
2350                    .await
2351                    .unwrap();
2352                assert_eq!(
2353                    row,
2354                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2355                );
2356
2357                // check delete wrong value
2358                let row = table
2359                    .get_row(
2360                        &OwnedRow::new(vec![Some(3_i32.into())]),
2361                        HummockReadEpoch::NoWait(u64::MAX),
2362                    )
2363                    .await
2364                    .unwrap();
2365                assert_eq!(row, None);
2366
2367                // check delete wrong pk
2368                let row = table
2369                    .get_row(
2370                        &OwnedRow::new(vec![Some(5_i32.into())]),
2371                        HummockReadEpoch::NoWait(u64::MAX),
2372                    )
2373                    .await
2374                    .unwrap();
2375                assert_eq!(row, None);
2376            }
2377            _ => unreachable!(),
2378        }
2379
2380        materialize_executor.next().await.transpose().unwrap();
2381        // materialize_executor.next().await.transpose().unwrap();
2382        // Second stream chunk. We check the existence of (7) -> (7,8)
2383        match materialize_executor.next().await.transpose().unwrap() {
2384            Some(Message::Barrier(_)) => {
2385                let row = table
2386                    .get_row(
2387                        &OwnedRow::new(vec![Some(1_i32.into())]),
2388                        HummockReadEpoch::NoWait(u64::MAX),
2389                    )
2390                    .await
2391                    .unwrap();
2392                assert_eq!(
2393                    row,
2394                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2395                );
2396
2397                // check update wrong value
2398                let row = table
2399                    .get_row(
2400                        &OwnedRow::new(vec![Some(2_i32.into())]),
2401                        HummockReadEpoch::NoWait(u64::MAX),
2402                    )
2403                    .await
2404                    .unwrap();
2405                assert_eq!(
2406                    row,
2407                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2408                );
2409
2410                // check update wrong pk, should become insert
2411                let row = table
2412                    .get_row(
2413                        &OwnedRow::new(vec![Some(9_i32.into())]),
2414                        HummockReadEpoch::NoWait(u64::MAX),
2415                    )
2416                    .await
2417                    .unwrap();
2418                assert_eq!(
2419                    row,
2420                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2421                );
2422            }
2423            _ => unreachable!(),
2424        }
2425    }
2426
2427    #[tokio::test]
2428    async fn test_do_update_if_not_null_conflict() {
2429        // Prepare storage and memtable.
2430        let memory_state_store = MemoryStateStore::new();
2431        let table_id = TableId::new(1);
2432        // Two columns of int32 type, the first column is PK.
2433        let schema = Schema::new(vec![
2434            Field::unnamed(DataType::Int32),
2435            Field::unnamed(DataType::Int32),
2436        ]);
2437        let column_ids = vec![0.into(), 1.into()];
2438
2439        // should get (8, 2)
2440        let chunk1 = StreamChunk::from_pretty(
2441            " i i
2442            + 1 4
2443            + 2 .
2444            + 3 6
2445            U- 8 .
2446            U+ 8 2
2447            + 8 .",
2448        );
2449
2450        // should not get (3, x), should not get (5, 0)
2451        let chunk2 = StreamChunk::from_pretty(
2452            " i i
2453            + 7 8
2454            - 3 4
2455            - 5 0",
2456        );
2457
2458        // should get (2, None), (7, 8)
2459        let chunk3 = StreamChunk::from_pretty(
2460            " i i
2461            + 1 5
2462            + 7 .
2463            U- 2 4
2464            U+ 2 .
2465            U- 9 0
2466            U+ 9 1",
2467        );
2468
2469        // Prepare stream executors.
2470        let source = MockSource::with_messages(vec![
2471            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2472            Message::Chunk(chunk1),
2473            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2474            Message::Chunk(chunk2),
2475            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2476            Message::Chunk(chunk3),
2477            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2478        ])
2479        .into_executor(schema.clone(), PkIndices::new());
2480
2481        let order_types = vec![OrderType::ascending()];
2482        let column_descs = vec![
2483            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2484            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2485        ];
2486
2487        let table = BatchTable::for_test(
2488            memory_state_store.clone(),
2489            table_id,
2490            column_descs,
2491            order_types,
2492            vec![0],
2493            vec![0, 1],
2494        );
2495
2496        let mut materialize_executor = MaterializeExecutor::for_test(
2497            source,
2498            memory_state_store,
2499            table_id,
2500            vec![ColumnOrder::new(0, OrderType::ascending())],
2501            column_ids,
2502            Arc::new(AtomicU64::new(0)),
2503            ConflictBehavior::DoUpdateIfNotNull,
2504        )
2505        .await
2506        .boxed()
2507        .execute();
2508        materialize_executor.next().await.transpose().unwrap();
2509
2510        materialize_executor.next().await.transpose().unwrap();
2511
2512        // First stream chunk. We check the existence of (3) -> (3,6)
2513        match materialize_executor.next().await.transpose().unwrap() {
2514            Some(Message::Barrier(_)) => {
2515                let row = table
2516                    .get_row(
2517                        &OwnedRow::new(vec![Some(8_i32.into())]),
2518                        HummockReadEpoch::NoWait(u64::MAX),
2519                    )
2520                    .await
2521                    .unwrap();
2522                assert_eq!(
2523                    row,
2524                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2525                );
2526
2527                let row = table
2528                    .get_row(
2529                        &OwnedRow::new(vec![Some(2_i32.into())]),
2530                        HummockReadEpoch::NoWait(u64::MAX),
2531                    )
2532                    .await
2533                    .unwrap();
2534                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2535            }
2536            _ => unreachable!(),
2537        }
2538        materialize_executor.next().await.transpose().unwrap();
2539
2540        match materialize_executor.next().await.transpose().unwrap() {
2541            Some(Message::Barrier(_)) => {
2542                let row = table
2543                    .get_row(
2544                        &OwnedRow::new(vec![Some(7_i32.into())]),
2545                        HummockReadEpoch::NoWait(u64::MAX),
2546                    )
2547                    .await
2548                    .unwrap();
2549                assert_eq!(
2550                    row,
2551                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2552                );
2553
2554                // check delete wrong value
2555                let row = table
2556                    .get_row(
2557                        &OwnedRow::new(vec![Some(3_i32.into())]),
2558                        HummockReadEpoch::NoWait(u64::MAX),
2559                    )
2560                    .await
2561                    .unwrap();
2562                assert_eq!(row, None);
2563
2564                // check delete wrong pk
2565                let row = table
2566                    .get_row(
2567                        &OwnedRow::new(vec![Some(5_i32.into())]),
2568                        HummockReadEpoch::NoWait(u64::MAX),
2569                    )
2570                    .await
2571                    .unwrap();
2572                assert_eq!(row, None);
2573            }
2574            _ => unreachable!(),
2575        }
2576
2577        materialize_executor.next().await.transpose().unwrap();
2578        // materialize_executor.next().await.transpose().unwrap();
2579        // Second stream chunk. We check the existence of (7) -> (7,8)
2580        match materialize_executor.next().await.transpose().unwrap() {
2581            Some(Message::Barrier(_)) => {
2582                let row = table
2583                    .get_row(
2584                        &OwnedRow::new(vec![Some(7_i32.into())]),
2585                        HummockReadEpoch::NoWait(u64::MAX),
2586                    )
2587                    .await
2588                    .unwrap();
2589                assert_eq!(
2590                    row,
2591                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2592                );
2593
2594                // check update wrong value
2595                let row = table
2596                    .get_row(
2597                        &OwnedRow::new(vec![Some(2_i32.into())]),
2598                        HummockReadEpoch::NoWait(u64::MAX),
2599                    )
2600                    .await
2601                    .unwrap();
2602                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2603
2604                // check update wrong pk, should become insert
2605                let row = table
2606                    .get_row(
2607                        &OwnedRow::new(vec![Some(9_i32.into())]),
2608                        HummockReadEpoch::NoWait(u64::MAX),
2609                    )
2610                    .await
2611                    .unwrap();
2612                assert_eq!(
2613                    row,
2614                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2615                );
2616            }
2617            _ => unreachable!(),
2618        }
2619    }
2620
2621    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2622        const KN: u32 = 4;
2623        const SEED: u64 = 998244353;
2624        let mut ret = vec![];
2625        let mut builder =
2626            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2627        let mut rng = SmallRng::seed_from_u64(SEED);
2628
2629        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2630            let len = c.data_chunk().capacity();
2631            let mut c = StreamChunkMut::from(c);
2632            for i in 0..len {
2633                c.set_vis(i, rng.random_bool(0.5));
2634            }
2635            c.into()
2636        };
2637        for _ in 0..row_number {
2638            let k = (rng.next_u32() % KN) as i32;
2639            let v = rng.next_u32() as i32;
2640            let op = if rng.random_bool(0.5) {
2641                Op::Insert
2642            } else {
2643                Op::Delete
2644            };
2645            if let Some(c) =
2646                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2647            {
2648                ret.push(random_vis(c, &mut rng));
2649            }
2650        }
2651        if let Some(c) = builder.take() {
2652            ret.push(random_vis(c, &mut rng));
2653        }
2654        ret
2655    }
2656
2657    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2658        const N: usize = 100000;
2659
2660        // Prepare storage and memtable.
2661        let memory_state_store = MemoryStateStore::new();
2662        let table_id = TableId::new(1);
2663        // Two columns of int32 type, the first column is PK.
2664        let schema = Schema::new(vec![
2665            Field::unnamed(DataType::Int32),
2666            Field::unnamed(DataType::Int32),
2667        ]);
2668        let column_ids = vec![0.into(), 1.into()];
2669
2670        let chunks = gen_fuzz_data(N, 128);
2671        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2672            .chain(chunks.into_iter().map(Message::Chunk))
2673            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2674                test_epoch(2),
2675            ))))
2676            .collect();
2677        // Prepare stream executors.
2678        let source =
2679            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2680
2681        let mut materialize_executor = MaterializeExecutor::for_test(
2682            source,
2683            memory_state_store.clone(),
2684            table_id,
2685            vec![ColumnOrder::new(0, OrderType::ascending())],
2686            column_ids,
2687            Arc::new(AtomicU64::new(0)),
2688            conflict_behavior,
2689        )
2690        .await
2691        .boxed()
2692        .execute();
2693        materialize_executor.expect_barrier().await;
2694
2695        let order_types = vec![OrderType::ascending()];
2696        let column_descs = vec![
2697            ColumnDesc::unnamed(0.into(), DataType::Int32),
2698            ColumnDesc::unnamed(1.into(), DataType::Int32),
2699        ];
2700        let pk_indices = vec![0];
2701
2702        let mut table = StateTable::from_table_catalog(
2703            &crate::common::table::test_utils::gen_pbtable(
2704                TableId::from(1002),
2705                column_descs.clone(),
2706                order_types,
2707                pk_indices,
2708                0,
2709            ),
2710            memory_state_store.clone(),
2711            None,
2712        )
2713        .await;
2714
2715        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2716            // check with state table's memtable
2717            table.write_chunk(c);
2718        }
2719    }
2720
2721    #[tokio::test]
2722    async fn fuzz_test_stream_consistent_upsert() {
2723        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2724    }
2725
2726    #[tokio::test]
2727    async fn fuzz_test_stream_consistent_ignore() {
2728        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2729    }
2730}