Skip to main content

risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57    NormalIngestion,
58    MergingData,
59    CleanUp,
60    CommitAndYieldBarrier {
61        barrier: BarrierInner<M>,
62        expect_next_state: Box<MaterializeStreamState<M>>,
63    },
64    RefreshEnd {
65        on_complete_epoch: EpochPair,
66    },
67}
68
69/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
70pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71    input: Executor,
72
73    schema: Schema,
74
75    state_table: StateTableInner<S, SD>,
76
77    /// Stream key indices of the *output* of this materialize node.
78    ///
79    /// This can be different from the table PK. Typically, there are 3 cases:
80    ///
81    /// - Normal `TABLE`: `stream_key == pk`, so no special handling is needed.
82    /// - TTL-ed `TABLE` (`WITH TTL`): `stream_key` is a superset of `pk` by appending the TTL
83    ///   watermark column. In this case, updates to the same pk may have different stream keys, so
84    ///   we must rewrite such `Update` into `Delete + Insert` before yielding.
85    /// - `MV` or `INDEX`: `pk` can be a superset of `stream_key` to also include the order key or
86    ///   distribution key specified by the user. No special handling is needed either.
87    stream_key_indices: Vec<usize>,
88
89    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
90    arrange_key_indices: Vec<usize>,
91
92    actor_context: ActorContextRef,
93
94    /// The cache for conflict handling. `None` if conflict behavior is `NoCheck`.
95    materialize_cache: Option<MaterializeCache>,
96
97    /// Indices of columns that may carry a Debezium unchanged-TOAST placeholder
98    /// (PostgreSQL CDC only). Mirrors the value passed into `MaterializeCache`.
99    /// Used at runtime to prevent the `Overwrite -> NoCheck` downgrade from
100    /// bypassing the placeholder replacement path.
101    toastable_column_indices: Option<Vec<usize>>,
102
103    conflict_behavior: ConflictBehavior,
104
105    /// Whether the table can clean itself by TTL watermark, i.e., is defined with `WATERMARK ... WITH TTL`.
106    cleaned_by_ttl_watermark: bool,
107
108    version_column_indices: Vec<u32>,
109
110    may_have_downstream: bool,
111
112    subscriber_ids: HashSet<SubscriberId>,
113
114    metrics: MaterializeMetrics,
115
116    /// No data will be written to hummock table. This Materialize is just a dummy node.
117    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
118    is_dummy_table: bool,
119
120    /// Optional refresh arguments and state for refreshable materialized views
121    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
122
123    /// Local barrier manager for reporting barrier events
124    local_barrier_manager: LocalBarrierManager,
125}
126
127/// Arguments and state for refreshable materialized views
128pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
129    /// Table catalog for main table
130    pub table_catalog: Table,
131
132    /// Table catalog for staging table
133    pub staging_table_catalog: Table,
134
135    /// Flag indicating if this table is currently being refreshed
136    pub is_refreshing: bool,
137
138    /// During data refresh (between `RefreshStart` and `LoadFinish`),
139    /// data will be written to both the main table and the staging table.
140    ///
141    /// The staging table is PK-only.
142    ///
143    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
144    pub staging_table: StateTableInner<S, SD>,
145
146    /// Progress table for tracking refresh state per `VNode` for fault tolerance
147    pub progress_table: RefreshProgressTable<S>,
148
149    /// Table ID for this refreshable materialized view
150    pub table_id: TableId,
151}
152
153impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
154    /// Create new `RefreshableMaterializeArgs`
155    pub async fn new(
156        store: S,
157        table_catalog: &Table,
158        staging_table_catalog: &Table,
159        progress_state_table: &Table,
160        vnodes: Option<Arc<Bitmap>>,
161    ) -> Self {
162        let table_id = table_catalog.id;
163
164        // staging table is pk-only, and we don't need to check value consistency
165        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
166            staging_table_catalog,
167            store.clone(),
168            vnodes.clone(),
169        )
170        .await;
171
172        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
173            progress_state_table,
174            store,
175            vnodes,
176        )
177        .await;
178
179        // Get primary key length from main table catalog
180        let pk_len = table_catalog.pk.len();
181        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
182
183        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
184
185        Self {
186            table_catalog: table_catalog.clone(),
187            staging_table_catalog: staging_table_catalog.clone(),
188            is_refreshing: false,
189            staging_table,
190            progress_table,
191            table_id,
192        }
193    }
194}
195
196fn get_op_consistency_level(
197    cleaned_by_ttl_watermark: bool,
198    conflict_behavior: ConflictBehavior,
199    may_have_downstream: bool,
200    subscriber_ids: &HashSet<SubscriberId>,
201) -> StateTableOpConsistencyLevel {
202    if cleaned_by_ttl_watermark {
203        // For tables with watermark TTL. Due to async state cleaning, it's uncertain whether an expired
204        // key has been cleaned up by the storage or not, thus we are not sure if to write `Update` or `Insert`
205        // if the key appears again.
206        StateTableOpConsistencyLevel::Inconsistent
207    } else if !subscriber_ids.is_empty() {
208        StateTableOpConsistencyLevel::LogStoreEnabled
209    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
210        // Table with overwrite conflict behavior could disable conflict check
211        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
212        StateTableOpConsistencyLevel::Inconsistent
213    } else {
214        StateTableOpConsistencyLevel::ConsistentOldValue
215    }
216}
217
218impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
219    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
220    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
221    /// should be `None`.
222    #[expect(clippy::too_many_arguments)]
223    pub async fn new(
224        input: Executor,
225        schema: Schema,
226        store: S,
227        arrange_key: Vec<ColumnOrder>,
228        actor_context: ActorContextRef,
229        vnodes: Option<Arc<Bitmap>>,
230        table_catalog: &Table,
231        watermark_epoch: AtomicU64Ref,
232        conflict_behavior: ConflictBehavior,
233        version_column_indices: Vec<u32>,
234        metrics: Arc<StreamingMetrics>,
235        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
236        cleaned_by_ttl_watermark: bool,
237        local_barrier_manager: LocalBarrierManager,
238    ) -> Self {
239        let table_columns: Vec<ColumnDesc> = table_catalog
240            .columns
241            .iter()
242            .map(|col| col.column_desc.as_ref().unwrap().into())
243            .collect();
244
245        // Extract TOAST-able column indices from table columns.
246        // Only for PostgreSQL CDC tables.
247        let toastable_column_indices = if table_catalog.cdc_table_type()
248            == risingwave_pb::catalog::table::CdcTableType::Postgres
249        {
250            let toastable_indices: Vec<usize> = table_columns
251                .iter()
252                .enumerate()
253                .filter_map(|(index, column)| match &column.data_type {
254                    // Currently supports TOAST updates for:
255                    // - jsonb (DataType::Jsonb)
256                    // - varchar (DataType::Varchar)
257                    // - bytea (DataType::Bytea)
258                    // - pgvector (DataType::Vector)
259                    // - One-dimensional arrays of the above types (DataType::List)
260                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
261
262                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
263                    DataType::Varchar
264                    | DataType::List(_)
265                    | DataType::Bytea
266                    | DataType::Jsonb
267                    | DataType::Vector(_) => Some(index),
268                    _ => None,
269                })
270                .collect();
271
272            if toastable_indices.is_empty() {
273                None
274            } else {
275                Some(toastable_indices)
276            }
277        } else {
278            None
279        };
280
281        let row_serde: BasicSerde = BasicSerde::new(
282            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
283            Arc::from(table_columns.into_boxed_slice()),
284        );
285
286        let stream_key_indices: Vec<usize> = table_catalog
287            .stream_key
288            .iter()
289            .map(|idx| *idx as usize)
290            .collect();
291        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
292        let may_have_downstream = actor_context.initial_dispatch_num != 0;
293        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
294        let op_consistency_level = get_op_consistency_level(
295            cleaned_by_ttl_watermark,
296            conflict_behavior,
297            may_have_downstream,
298            &subscriber_ids,
299        );
300        let state_table_metrics = metrics.new_state_table_metrics(
301            table_catalog.id,
302            actor_context.id,
303            actor_context.fragment_id,
304        );
305        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
306        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
307            .with_op_consistency_level(op_consistency_level)
308            .enable_preload_all_rows_by_config(&actor_context.config)
309            .enable_vnode_key_stats(
310                actor_context
311                    .config
312                    .developer
313                    .enable_vnode_key_stats_for_materialize,
314                &actor_context.config,
315            )
316            .with_metrics(state_table_metrics)
317            .build()
318            .await;
319
320        let mv_metrics = metrics.new_materialize_metrics(
321            table_catalog.id,
322            actor_context.id,
323            actor_context.fragment_id,
324        );
325        let cache_metrics = metrics.new_materialize_cache_metrics(
326            table_catalog.id,
327            actor_context.id,
328            actor_context.fragment_id,
329        );
330
331        let metrics_info =
332            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
333
334        let is_dummy_table =
335            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
336
337        Self {
338            input,
339            schema,
340            state_table,
341            stream_key_indices,
342            arrange_key_indices,
343            actor_context,
344            materialize_cache: MaterializeCache::new(
345                watermark_epoch,
346                metrics_info,
347                row_serde,
348                version_column_indices.clone(),
349                conflict_behavior,
350                toastable_column_indices.clone(),
351                cache_metrics,
352            ),
353            toastable_column_indices,
354            conflict_behavior,
355            cleaned_by_ttl_watermark,
356            version_column_indices,
357            is_dummy_table,
358            may_have_downstream,
359            subscriber_ids,
360            metrics: mv_metrics,
361            refresh_args,
362            local_barrier_manager,
363        }
364    }
365
366    #[try_stream(ok = Message, error = StreamExecutorError)]
367    async fn execute_inner(mut self) {
368        let mv_table_id = self.state_table.table_id();
369        let data_types = self.schema.data_types();
370        let mut input = self.input.execute();
371
372        let barrier = expect_first_barrier(&mut input).await?;
373        let first_epoch = barrier.epoch;
374        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
375        // The first barrier message should be propagated.
376        yield Message::Barrier(barrier);
377        self.state_table.init_epoch(first_epoch).await?;
378
379        // default to normal ingestion
380        let mut inner_state =
381            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
382        // Initialize staging table for refreshable materialized views
383        if let Some(ref mut refresh_args) = self.refresh_args {
384            refresh_args.staging_table.init_epoch(first_epoch).await?;
385
386            // Initialize progress table and load existing progress for recovery
387            refresh_args.progress_table.recover(first_epoch).await?;
388
389            // Check if refresh is already in progress (recovery scenario)
390            let progress_stats = refresh_args.progress_table.get_progress_stats();
391            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
392                refresh_args.is_refreshing = true;
393                tracing::info!(
394                    total_vnodes = progress_stats.total_vnodes,
395                    completed_vnodes = progress_stats.completed_vnodes,
396                    "Recovered refresh in progress, resuming refresh operation"
397                );
398
399                // Since stage info is no longer stored in progress table,
400                // we need to determine recovery state differently.
401                // For now, assume all incomplete VNodes need to continue merging
402                let incomplete_vnodes: Vec<_> = refresh_args
403                    .progress_table
404                    .get_all_progress()
405                    .iter()
406                    .filter(|(_, entry)| !entry.is_completed)
407                    .map(|(&vnode, _)| vnode)
408                    .collect();
409
410                if !incomplete_vnodes.is_empty() {
411                    // Some VNodes are incomplete, need to resume refresh operation
412                    tracing::info!(
413                        incomplete_vnodes = incomplete_vnodes.len(),
414                        "Recovery detected incomplete VNodes, resuming refresh operation"
415                    );
416                    // Since stage tracking is now in memory, we'll determine the appropriate
417                    // stage based on the executor's internal state machine
418                } else {
419                    // This should not happen if is_complete() returned false, but handle it gracefully
420                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
421                }
422            }
423        }
424
425        // Determine initial execution stage (for recovery scenarios)
426        if let Some(ref refresh_args) = self.refresh_args
427            && refresh_args.is_refreshing
428        {
429            // Recovery logic: Check if there are incomplete vnodes from previous run
430            let incomplete_vnodes: Vec<_> = refresh_args
431                .progress_table
432                .get_all_progress()
433                .iter()
434                .filter(|(_, entry)| !entry.is_completed)
435                .map(|(&vnode, _)| vnode)
436                .collect();
437            if !incomplete_vnodes.is_empty() {
438                // Resume from merge stage since some VNodes were left incomplete
439                *inner_state = MaterializeStreamState::<_>::MergingData;
440                tracing::info!(
441                    incomplete_vnodes = incomplete_vnodes.len(),
442                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
443                );
444            }
445        }
446
447        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
448        'main_loop: loop {
449            match *inner_state {
450                MaterializeStreamState::NormalIngestion => {
451                    #[for_await]
452                    '_normal_ingest: for msg in input.by_ref() {
453                        let msg = msg?;
454                        if let Some(cache) = &mut self.materialize_cache {
455                            cache.evict();
456                        }
457
458                        match msg {
459                            Message::Watermark(w) => {
460                                if self.cleaned_by_ttl_watermark
461                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
462                                {
463                                    self.state_table.update_watermark(w.val.clone());
464                                }
465                                yield Message::Watermark(w);
466                            }
467                            Message::Chunk(chunk) if self.is_dummy_table => {
468                                self.metrics
469                                    .materialize_input_row_count
470                                    .inc_by(chunk.cardinality() as u64);
471                                yield Message::Chunk(chunk);
472                            }
473                            Message::Chunk(chunk) => {
474                                self.metrics
475                                    .materialize_input_row_count
476                                    .inc_by(chunk.cardinality() as u64);
477
478                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
479                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
480                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
481                                // while outputting inconsistent changes to downstream which no one will subscribe to.
482                                // For tables with watermark TTL (indicated by `cleaned_by_ttl_watermark`), conflict check must be enabled.
483                                // TODO(ttl): differentiate between table consistency and downstream consistency.
484                                // When the table carries TOAST-able columns (PostgreSQL CDC), the
485                                // `MaterializeCache` is the only place where the Debezium
486                                // unchanged-TOAST placeholder is detected and swapped for the row's
487                                // previous value. Demoting to `NoCheck` here would skip the cache
488                                // entirely and let the placeholder land in state. Keep the original
489                                // `Overwrite` behavior in that case so the replacement path runs.
490                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
491                                    self.conflict_behavior
492                                    && !self.state_table.is_consistent_op()
493                                    && !self.cleaned_by_ttl_watermark
494                                    && self.version_column_indices.is_empty()
495                                    && self.toastable_column_indices.is_none()
496                                {
497                                    ConflictBehavior::NoCheck
498                                } else {
499                                    self.conflict_behavior
500                                };
501
502                                match optimized_conflict_behavior {
503                                    checked_conflict_behaviors!() => {
504                                        if chunk.cardinality() == 0 {
505                                            // empty chunk
506                                            continue;
507                                        }
508
509                                        // For refreshable materialized views, write to staging table during refresh
510                                        // Do not use generate_output here.
511                                        if let Some(ref mut refresh_args) = self.refresh_args
512                                            && refresh_args.is_refreshing
513                                        {
514                                            let key_chunk = chunk
515                                                .clone()
516                                                .project(self.state_table.pk_indices());
517                                            tracing::trace!(
518                                                staging_chunk = %key_chunk.to_pretty(),
519                                                input_chunk = %chunk.to_pretty(),
520                                                "writing to staging table"
521                                            );
522                                            if cfg!(debug_assertions) {
523                                                // refreshable source should be append-only
524                                                assert!(
525                                                    key_chunk
526                                                        .ops()
527                                                        .iter()
528                                                        .all(|op| op == &Op::Insert)
529                                                );
530                                            }
531                                            refresh_args
532                                                .staging_table
533                                                .write_chunk(key_chunk.clone());
534                                            refresh_args.staging_table.try_flush().await?;
535                                        }
536
537                                        let cache = self.materialize_cache.as_mut().unwrap();
538                                        let change_buffer =
539                                            cache.handle_new(chunk, &self.state_table).await?;
540
541                                        let output_chunk = if self.stream_key_indices
542                                            == self.state_table.pk_indices()
543                                        {
544                                            change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
545                                                data_types.clone(),
546                                            )
547                                        } else {
548                                            // We only hit this branch for TTL-ed tables for now.
549                                            // Assert stream key is a superset of pk.
550                                            debug_assert!(
551                                                self.state_table
552                                                    .pk_indices()
553                                                    .iter()
554                                                    .all(|&i| self.stream_key_indices.contains(&i))
555                                            );
556                                            change_buffer.into_chunk_with_key(
557                                                data_types.clone(),
558                                                &self.stream_key_indices,
559                                            )
560                                        };
561
562                                        match output_chunk {
563                                            Some(output_chunk) => {
564                                                self.state_table.write_chunk(output_chunk.clone());
565                                                self.state_table.try_flush().await?;
566                                                yield Message::Chunk(output_chunk);
567                                            }
568                                            None => continue,
569                                        }
570                                    }
571                                    ConflictBehavior::NoCheck => {
572                                        self.state_table.write_chunk(chunk.clone());
573                                        self.state_table.try_flush().await?;
574
575                                        // For refreshable materialized views, also write to staging table during refresh
576                                        if let Some(ref mut refresh_args) = self.refresh_args
577                                            && refresh_args.is_refreshing
578                                        {
579                                            let key_chunk = chunk
580                                                .clone()
581                                                .project(self.state_table.pk_indices());
582                                            tracing::trace!(
583                                                staging_chunk = %key_chunk.to_pretty(),
584                                                input_chunk = %chunk.to_pretty(),
585                                                "writing to staging table"
586                                            );
587                                            if cfg!(debug_assertions) {
588                                                // refreshable source should be append-only
589                                                assert!(
590                                                    key_chunk
591                                                        .ops()
592                                                        .iter()
593                                                        .all(|op| op == &Op::Insert)
594                                                );
595                                            }
596                                            refresh_args
597                                                .staging_table
598                                                .write_chunk(key_chunk.clone());
599                                            refresh_args.staging_table.try_flush().await?;
600                                        }
601
602                                        yield Message::Chunk(chunk);
603                                    }
604                                }
605                            }
606                            Message::Barrier(barrier) => {
607                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
608                                    barrier,
609                                    expect_next_state: Box::new(
610                                        MaterializeStreamState::NormalIngestion,
611                                    ),
612                                };
613                                continue 'main_loop;
614                            }
615                        }
616                    }
617
618                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
619                        anyhow::anyhow!(
620                            "Input stream terminated unexpectedly during normal ingestion"
621                        ),
622                    )));
623                }
624                MaterializeStreamState::MergingData => {
625                    let Some(refresh_args) = self.refresh_args.as_mut() else {
626                        panic!(
627                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
628                        );
629                    };
630                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
631
632                    debug_assert_eq!(
633                        self.state_table.vnodes(),
634                        refresh_args.staging_table.vnodes()
635                    );
636                    debug_assert_eq!(
637                        refresh_args.staging_table.vnodes(),
638                        refresh_args.progress_table.vnodes()
639                    );
640
641                    let mut rows_to_delete = vec![];
642                    let mut merge_complete = false;
643                    let mut pending_barrier: Option<Barrier> = None;
644
645                    // Scope to limit immutable borrows to state tables
646                    {
647                        let left_input = input.by_ref().map(Either::Left);
648                        let right_merge_sort = pin!(
649                            Self::make_mergesort_stream(
650                                &self.state_table,
651                                &refresh_args.staging_table,
652                                &mut refresh_args.progress_table
653                            )
654                            .map(Either::Right)
655                        );
656
657                        // Prefer to select input stream to handle barriers promptly
658                        // Rebuild the merge stream each time processing a barrier
659                        let mut merge_stream =
660                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
661                                stream::PollNext::Left
662                            });
663
664                        #[for_await]
665                        'merge_stream: for either in &mut merge_stream {
666                            match either {
667                                Either::Left(msg) => {
668                                    let msg = msg?;
669                                    match msg {
670                                        Message::Watermark(w) => yield Message::Watermark(w),
671                                        Message::Chunk(chunk) => {
672                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
673                                        }
674                                        Message::Barrier(b) => {
675                                            pending_barrier = Some(b);
676                                            break 'merge_stream;
677                                        }
678                                    }
679                                }
680                                Either::Right(result) => {
681                                    match result? {
682                                        Some((_vnode, row)) => {
683                                            rows_to_delete.push(row);
684                                        }
685                                        None => {
686                                            // Merge stream finished
687                                            merge_complete = true;
688
689                                            // If the merge stream finished, we need to wait for the next barrier to commit states
690                                        }
691                                    }
692                                }
693                            }
694                        }
695                    }
696
697                    // Process collected rows for deletion
698                    for row in &rows_to_delete {
699                        self.state_table.delete(row);
700                    }
701                    if !rows_to_delete.is_empty() {
702                        let to_delete_chunk = StreamChunk::from_rows(
703                            &rows_to_delete
704                                .iter()
705                                .map(|row| (Op::Delete, row))
706                                .collect_vec(),
707                            &self.schema.data_types(),
708                        );
709
710                        yield Message::Chunk(to_delete_chunk);
711                    }
712
713                    // should wait for at least one barrier
714                    assert!(pending_barrier.is_some(), "pending barrier is not set");
715
716                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
717                        barrier: pending_barrier.unwrap(),
718                        expect_next_state: if merge_complete {
719                            Box::new(MaterializeStreamState::CleanUp)
720                        } else {
721                            Box::new(MaterializeStreamState::MergingData)
722                        },
723                    };
724                    continue 'main_loop;
725                }
726                MaterializeStreamState::CleanUp => {
727                    let Some(refresh_args) = self.refresh_args.as_mut() else {
728                        panic!(
729                            "MaterializeExecutor entered MergingData state without refresh_args configured"
730                        );
731                    };
732                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
733
734                    #[for_await]
735                    for msg in input.by_ref() {
736                        let msg = msg?;
737                        match msg {
738                            Message::Watermark(w) => {
739                                if self.cleaned_by_ttl_watermark
740                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
741                                {
742                                    self.state_table.update_watermark(w.val.clone());
743                                }
744                                yield Message::Watermark(w)
745                            }
746                            Message::Chunk(chunk) => {
747                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
748                            }
749                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
750                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
751                                    barrier,
752                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
753                                };
754                                continue 'main_loop;
755                            }
756                            Message::Barrier(barrier) => {
757                                let staging_table_id = refresh_args.staging_table.table_id();
758                                let epoch = barrier.epoch;
759                                self.local_barrier_manager.report_refresh_finished(
760                                    epoch,
761                                    self.actor_context.id,
762                                    refresh_args.table_id,
763                                    staging_table_id,
764                                );
765                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
766
767                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
768                                    barrier,
769                                    expect_next_state: Box::new(
770                                        MaterializeStreamState::RefreshEnd {
771                                            on_complete_epoch: epoch,
772                                        },
773                                    ),
774                                };
775                                continue 'main_loop;
776                            }
777                        }
778                    }
779                }
780                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
781                    let Some(refresh_args) = self.refresh_args.as_mut() else {
782                        panic!(
783                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
784                        );
785                    };
786                    let staging_table_id = refresh_args.staging_table.table_id();
787
788                    // Wait for staging table truncation to complete
789                    let staging_store = refresh_args.staging_table.state_store().clone();
790                    staging_store
791                        .try_wait_epoch(
792                            HummockReadEpoch::Committed(on_complete_epoch.prev),
793                            TryWaitEpochOptions {
794                                table_id: staging_table_id,
795                            },
796                        )
797                        .await?;
798
799                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
800
801                    if let Some(ref mut refresh_args) = self.refresh_args {
802                        refresh_args.is_refreshing = false;
803                    }
804                    *inner_state = MaterializeStreamState::NormalIngestion;
805                    continue 'main_loop;
806                }
807                MaterializeStreamState::CommitAndYieldBarrier {
808                    barrier,
809                    mut expect_next_state,
810                } => {
811                    if let Some(ref mut refresh_args) = self.refresh_args {
812                        match barrier.mutation.as_deref() {
813                            Some(Mutation::RefreshStart {
814                                table_id: refresh_table_id,
815                                associated_source_id: _,
816                            }) if *refresh_table_id == refresh_args.table_id => {
817                                debug_assert!(
818                                    !refresh_args.is_refreshing,
819                                    "cannot start refresh twice"
820                                );
821                                refresh_args.is_refreshing = true;
822                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
823
824                                // Initialize progress tracking for all VNodes
825                                Self::init_refresh_progress(
826                                    &self.state_table,
827                                    &mut refresh_args.progress_table,
828                                    barrier.epoch.curr,
829                                )?;
830                            }
831                            Some(Mutation::LoadFinish {
832                                associated_source_id: load_finish_source_id,
833                            }) => {
834                                // Get associated source id from table catalog
835                                let associated_source_id: SourceId = match refresh_args
836                                    .table_catalog
837                                    .optional_associated_source_id
838                                {
839                                    Some(id) => id.into(),
840                                    None => unreachable!("associated_source_id is not set"),
841                                };
842
843                                if *load_finish_source_id == associated_source_id {
844                                    tracing::info!(
845                                        %load_finish_source_id,
846                                        "LoadFinish received, starting data replacement"
847                                    );
848                                    *expect_next_state = MaterializeStreamState::<_>::MergingData;
849                                }
850                            }
851                            _ => {}
852                        }
853                    }
854
855                    // ===== normal operation =====
856
857                    // If a downstream mv depends on the current table, we need to do conflict check again.
858                    if !self.may_have_downstream
859                        && barrier.has_more_downstream_fragments(self.actor_context.id)
860                    {
861                        self.may_have_downstream = true;
862                    }
863                    Self::may_update_depended_subscriptions(
864                        &mut self.subscriber_ids,
865                        &barrier,
866                        mv_table_id,
867                    );
868                    let op_consistency_level = get_op_consistency_level(
869                        self.cleaned_by_ttl_watermark,
870                        self.conflict_behavior,
871                        self.may_have_downstream,
872                        &self.subscriber_ids,
873                    );
874                    let post_commit = self
875                        .state_table
876                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
877                        .await?;
878
879                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
880
881                    // Commit staging table for refreshable materialized views
882                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
883                    {
884                        // Commit progress table for fault tolerance
885
886                        Some((
887                            refresh_args.staging_table.commit(barrier.epoch).await?,
888                            refresh_args.progress_table.commit(barrier.epoch).await?,
889                        ))
890                    } else {
891                        None
892                    };
893
894                    let b_epoch = barrier.epoch;
895                    yield Message::Barrier(barrier);
896
897                    // Update the vnode bitmap for the state table if asked.
898                    if let Some((_, cache_may_stale)) = post_commit
899                        .post_yield_barrier(update_vnode_bitmap.clone())
900                        .await?
901                        && cache_may_stale
902                        && let Some(cache) = &mut self.materialize_cache
903                    {
904                        cache.clear();
905                    }
906
907                    // Handle staging table post commit
908                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
909                        staging_post_commit
910                            .post_yield_barrier(update_vnode_bitmap.clone())
911                            .await?;
912                        progress_post_commit
913                            .post_yield_barrier(update_vnode_bitmap)
914                            .await?;
915                    }
916
917                    self.metrics
918                        .materialize_current_epoch
919                        .set(b_epoch.curr as i64);
920
921                    // ====== transition to next state ======
922
923                    *inner_state = *expect_next_state;
924                }
925            }
926        }
927    }
928
929    /// Stream that yields rows to be deleted from main table.
930    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
931    /// Yields `None` when finished processing all vnodes.
932    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
933    async fn make_mergesort_stream<'a>(
934        main_table: &'a StateTableInner<S, SD>,
935        staging_table: &'a StateTableInner<S, SD>,
936        progress_table: &'a mut RefreshProgressTable<S>,
937    ) {
938        for vnode in main_table.vnodes().clone().iter_vnodes() {
939            let mut processed_rows = 0;
940            // Check if this VNode has already been completed (for fault tolerance)
941            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
942                if let Some(current_entry) = progress_table.get_progress(vnode) {
943                    // Skip already completed VNodes during recovery
944                    if current_entry.is_completed {
945                        tracing::debug!(
946                            vnode = vnode.to_index(),
947                            "Skipping already completed VNode during recovery"
948                        );
949                        continue;
950                    }
951                    processed_rows += current_entry.processed_rows;
952                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
953
954                    if let Some(current_state) = &current_entry.current_pos {
955                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
956                    } else {
957                        (Bound::Unbounded, Bound::Unbounded)
958                    }
959                } else {
960                    (Bound::Unbounded, Bound::Unbounded)
961                };
962
963            let iter_main = main_table
964                .iter_keyed_row_with_vnode(
965                    vnode,
966                    &pk_range,
967                    PrefetchOptions::prefetch_for_large_range_scan(),
968                )
969                .await?;
970            let iter_staging = staging_table
971                .iter_keyed_row_with_vnode(
972                    vnode,
973                    &pk_range,
974                    PrefetchOptions::prefetch_for_large_range_scan(),
975                )
976                .await?;
977
978            pin_mut!(iter_main);
979            pin_mut!(iter_staging);
980
981            // Sort-merge join implementation using dual pointers
982            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
983            let mut staging_item: Option<KeyedRow<Bytes>> =
984                iter_staging.next().await.transpose()?;
985
986            while let Some(main_kv) = main_item {
987                let main_key = main_kv.key();
988
989                // Advance staging iterator until we find a key >= main_key
990                let mut should_delete = false;
991                while let Some(staging_kv) = &staging_item {
992                    let staging_key = staging_kv.key();
993                    match main_key.cmp(staging_key) {
994                        std::cmp::Ordering::Greater => {
995                            // main_key > staging_key, advance staging
996                            staging_item = iter_staging.next().await.transpose()?;
997                        }
998                        std::cmp::Ordering::Equal => {
999                            // Keys match, this row exists in both tables, no need to delete
1000                            break;
1001                        }
1002                        std::cmp::Ordering::Less => {
1003                            // main_key < staging_key, main row doesn't exist in staging, delete it
1004                            should_delete = true;
1005                            break;
1006                        }
1007                    }
1008                }
1009
1010                // If staging_item is None, all remaining main rows should be deleted
1011                if staging_item.is_none() {
1012                    should_delete = true;
1013                }
1014
1015                if should_delete {
1016                    yield Some((vnode, main_kv.row().clone()));
1017                }
1018
1019                // Advance main iterator
1020                processed_rows += 1;
1021                tracing::debug!(
1022                    "set progress table: vnode = {:?}, processed_rows = {:?}",
1023                    vnode,
1024                    processed_rows
1025                );
1026                progress_table.set_progress(
1027                    vnode,
1028                    Some(
1029                        main_kv
1030                            .row()
1031                            .project(main_table.pk_indices())
1032                            .to_owned_row(),
1033                    ),
1034                    false,
1035                    processed_rows,
1036                )?;
1037                main_item = iter_main.next().await.transpose()?;
1038            }
1039
1040            // Mark this VNode as completed
1041            if let Some(current_entry) = progress_table.get_progress(vnode) {
1042                progress_table.set_progress(
1043                    vnode,
1044                    current_entry.current_pos.clone(),
1045                    true, // completed
1046                    current_entry.processed_rows,
1047                )?;
1048
1049                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1050            }
1051        }
1052
1053        // Signal completion
1054        yield None;
1055    }
1056
1057    /// return true when changed
1058    fn may_update_depended_subscriptions(
1059        depended_subscriptions: &mut HashSet<SubscriberId>,
1060        barrier: &Barrier,
1061        mv_table_id: TableId,
1062    ) {
1063        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1064            if !depended_subscriptions.insert(subscriber_id) {
1065                warn!(
1066                    ?depended_subscriptions,
1067                    %mv_table_id,
1068                    %subscriber_id,
1069                    "subscription id already exists"
1070                );
1071            }
1072        }
1073
1074        if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1075            for SubscriptionUpstreamInfo {
1076                subscriber_id,
1077                upstream_mv_table_id,
1078            } in subscriptions_to_drop
1079            {
1080                if *upstream_mv_table_id == mv_table_id
1081                    && !depended_subscriptions.remove(subscriber_id)
1082                {
1083                    warn!(
1084                        ?depended_subscriptions,
1085                        %mv_table_id,
1086                        %subscriber_id,
1087                        "drop non existing subscriber_id id"
1088                    );
1089                }
1090            }
1091        }
1092    }
1093
1094    /// Initialize refresh progress tracking for all `VNodes`
1095    fn init_refresh_progress(
1096        state_table: &StateTableInner<S, SD>,
1097        progress_table: &mut RefreshProgressTable<S>,
1098        _epoch: u64,
1099    ) -> StreamExecutorResult<()> {
1100        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1101
1102        // Initialize progress for all VNodes in the current bitmap
1103        for vnode in state_table.vnodes().iter_vnodes() {
1104            progress_table.set_progress(
1105                vnode, None,  // initial position
1106                false, // not completed yet
1107                0,     // initial processed rows
1108            )?;
1109        }
1110
1111        tracing::info!(
1112            vnodes_count = state_table.vnodes().count_ones(),
1113            "Initialized refresh progress tracking for all VNodes"
1114        );
1115
1116        Ok(())
1117    }
1118}
1119
1120impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1121    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1122    #[cfg(any(test, feature = "test"))]
1123    pub async fn for_test(
1124        input: Executor,
1125        store: S,
1126        table_id: TableId,
1127        keys: Vec<ColumnOrder>,
1128        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1129        watermark_epoch: AtomicU64Ref,
1130        conflict_behavior: ConflictBehavior,
1131    ) -> Self {
1132        Self::for_test_inner(
1133            input,
1134            store,
1135            table_id,
1136            keys,
1137            column_ids,
1138            watermark_epoch,
1139            conflict_behavior,
1140            None,
1141        )
1142        .await
1143    }
1144
1145    #[cfg(any(test, feature = "test"))]
1146    #[expect(clippy::too_many_arguments)]
1147    pub async fn for_test_with_stream_key(
1148        input: Executor,
1149        store: S,
1150        table_id: TableId,
1151        keys: Vec<ColumnOrder>,
1152        stream_key: Vec<usize>,
1153        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1154        watermark_epoch: AtomicU64Ref,
1155        conflict_behavior: ConflictBehavior,
1156    ) -> Self {
1157        Self::for_test_inner(
1158            input,
1159            store,
1160            table_id,
1161            keys,
1162            column_ids,
1163            watermark_epoch,
1164            conflict_behavior,
1165            Some(stream_key),
1166        )
1167        .await
1168    }
1169
1170    #[cfg(any(test, feature = "test"))]
1171    #[expect(clippy::too_many_arguments)]
1172    async fn for_test_inner(
1173        input: Executor,
1174        store: S,
1175        table_id: TableId,
1176        keys: Vec<ColumnOrder>,
1177        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1178        watermark_epoch: AtomicU64Ref,
1179        conflict_behavior: ConflictBehavior,
1180        stream_key: Option<Vec<usize>>,
1181    ) -> Self {
1182        use risingwave_common::util::iter_util::ZipEqFast;
1183
1184        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1185        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1186        let schema = input.schema().clone();
1187        let columns: Vec<ColumnDesc> = column_ids
1188            .into_iter()
1189            .zip_eq_fast(schema.fields.iter())
1190            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1191            .collect_vec();
1192
1193        let row_serde = BasicSerde::new(
1194            Arc::from((0..columns.len()).collect_vec()),
1195            Arc::from(columns.clone().into_boxed_slice()),
1196        );
1197        let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1198        let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1199            table_id,
1200            columns,
1201            arrange_order_types,
1202            arrange_columns.clone(),
1203            0,
1204        );
1205        table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1206        let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1207
1208        let unused = StreamingMetrics::unused();
1209        let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1210        let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1211
1212        Self {
1213            input,
1214            schema,
1215            state_table,
1216            stream_key_indices,
1217            arrange_key_indices: arrange_columns.clone(),
1218            actor_context: ActorContext::for_test(0),
1219            materialize_cache: MaterializeCache::new(
1220                watermark_epoch,
1221                MetricsInfo::for_test(),
1222                row_serde,
1223                vec![],
1224                conflict_behavior,
1225                None,
1226                cache_metrics,
1227            ),
1228            toastable_column_indices: None,
1229            conflict_behavior,
1230            cleaned_by_ttl_watermark: false,
1231            version_column_indices: vec![],
1232            is_dummy_table: false,
1233            may_have_downstream: true,
1234            subscriber_ids: HashSet::new(),
1235            metrics,
1236            refresh_args: None, // Test constructor doesn't support refresh functionality
1237            local_barrier_manager: LocalBarrierManager::for_test(),
1238        }
1239    }
1240}
1241
1242impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1243    fn execute(self: Box<Self>) -> BoxedMessageStream {
1244        self.execute_inner().boxed()
1245    }
1246}
1247
1248impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250        f.debug_struct("MaterializeExecutor")
1251            .field("arrange_key_indices", &self.arrange_key_indices)
1252            .field("stream_key_indices", &self.stream_key_indices)
1253            .finish()
1254    }
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259
1260    use std::iter;
1261    use std::sync::atomic::AtomicU64;
1262
1263    use rand::rngs::SmallRng;
1264    use rand::{Rng, RngCore, SeedableRng};
1265    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1266    use risingwave_common::catalog::Field;
1267    use risingwave_common::util::epoch::test_epoch;
1268    use risingwave_common::util::sort_util::OrderType;
1269    use risingwave_hummock_sdk::HummockReadEpoch;
1270    use risingwave_storage::memory::MemoryStateStore;
1271    use risingwave_storage::table::batch_table::BatchTable;
1272
1273    use super::*;
1274    use crate::executor::test_utils::*;
1275
1276    #[tokio::test]
1277    async fn test_materialize_executor() {
1278        // Prepare storage and memtable.
1279        let memory_state_store = MemoryStateStore::new();
1280        let table_id = TableId::new(1);
1281        // Two columns of int32 type, the first column is PK.
1282        let schema = Schema::new(vec![
1283            Field::unnamed(DataType::Int32),
1284            Field::unnamed(DataType::Int32),
1285        ]);
1286        let column_ids = vec![0.into(), 1.into()];
1287
1288        // Prepare source chunks.
1289        let chunk1 = StreamChunk::from_pretty(
1290            " i i
1291            + 1 4
1292            + 2 5
1293            + 3 6",
1294        );
1295        let chunk2 = StreamChunk::from_pretty(
1296            " i i
1297            + 7 8
1298            - 3 6",
1299        );
1300
1301        // Prepare stream executors.
1302        let source = MockSource::with_messages(vec![
1303            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1304            Message::Chunk(chunk1),
1305            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1306            Message::Chunk(chunk2),
1307            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1308        ])
1309        .into_executor(schema.clone(), StreamKey::new());
1310
1311        let order_types = vec![OrderType::ascending()];
1312        let column_descs = vec![
1313            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1314            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1315        ];
1316
1317        let table = BatchTable::for_test(
1318            memory_state_store.clone(),
1319            table_id,
1320            column_descs,
1321            order_types,
1322            vec![0],
1323            vec![0, 1],
1324        );
1325
1326        let mut materialize_executor = MaterializeExecutor::for_test(
1327            source,
1328            memory_state_store,
1329            table_id,
1330            vec![ColumnOrder::new(0, OrderType::ascending())],
1331            column_ids,
1332            Arc::new(AtomicU64::new(0)),
1333            ConflictBehavior::NoCheck,
1334        )
1335        .await
1336        .boxed()
1337        .execute();
1338        materialize_executor.next().await.transpose().unwrap();
1339
1340        materialize_executor.next().await.transpose().unwrap();
1341
1342        // First stream chunk. We check the existence of (3) -> (3,6)
1343        match materialize_executor.next().await.transpose().unwrap() {
1344            Some(Message::Barrier(_)) => {
1345                let row = table
1346                    .get_row(
1347                        &OwnedRow::new(vec![Some(3_i32.into())]),
1348                        HummockReadEpoch::NoWait(u64::MAX),
1349                    )
1350                    .await
1351                    .unwrap();
1352                assert_eq!(
1353                    row,
1354                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1355                );
1356            }
1357            _ => unreachable!(),
1358        }
1359        materialize_executor.next().await.transpose().unwrap();
1360        // Second stream chunk. We check the existence of (7) -> (7,8)
1361        match materialize_executor.next().await.transpose().unwrap() {
1362            Some(Message::Barrier(_)) => {
1363                let row = table
1364                    .get_row(
1365                        &OwnedRow::new(vec![Some(7_i32.into())]),
1366                        HummockReadEpoch::NoWait(u64::MAX),
1367                    )
1368                    .await
1369                    .unwrap();
1370                assert_eq!(
1371                    row,
1372                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1373                );
1374            }
1375            _ => unreachable!(),
1376        }
1377    }
1378
1379    // https://github.com/risingwavelabs/risingwave/issues/13346
1380    #[tokio::test]
1381    async fn test_upsert_stream() {
1382        // Prepare storage and memtable.
1383        let memory_state_store = MemoryStateStore::new();
1384        let table_id = TableId::new(1);
1385        // Two columns of int32 type, the first column is PK.
1386        let schema = Schema::new(vec![
1387            Field::unnamed(DataType::Int32),
1388            Field::unnamed(DataType::Int32),
1389        ]);
1390        let column_ids = vec![0.into(), 1.into()];
1391
1392        // test double insert one pk, the latter needs to override the former.
1393        let chunk1 = StreamChunk::from_pretty(
1394            " i i
1395            + 1 1",
1396        );
1397
1398        let chunk2 = StreamChunk::from_pretty(
1399            " i i
1400            + 1 2
1401            - 1 2",
1402        );
1403
1404        // Prepare stream executors.
1405        let source = MockSource::with_messages(vec![
1406            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1407            Message::Chunk(chunk1),
1408            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1409            Message::Chunk(chunk2),
1410            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1411        ])
1412        .into_executor(schema.clone(), StreamKey::new());
1413
1414        let order_types = vec![OrderType::ascending()];
1415        let column_descs = vec![
1416            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1417            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1418        ];
1419
1420        let table = BatchTable::for_test(
1421            memory_state_store.clone(),
1422            table_id,
1423            column_descs,
1424            order_types,
1425            vec![0],
1426            vec![0, 1],
1427        );
1428
1429        let mut materialize_executor = MaterializeExecutor::for_test(
1430            source,
1431            memory_state_store,
1432            table_id,
1433            vec![ColumnOrder::new(0, OrderType::ascending())],
1434            column_ids,
1435            Arc::new(AtomicU64::new(0)),
1436            ConflictBehavior::Overwrite,
1437        )
1438        .await
1439        .boxed()
1440        .execute();
1441        materialize_executor.next().await.transpose().unwrap();
1442
1443        materialize_executor.next().await.transpose().unwrap();
1444        materialize_executor.next().await.transpose().unwrap();
1445        materialize_executor.next().await.transpose().unwrap();
1446
1447        match materialize_executor.next().await.transpose().unwrap() {
1448            Some(Message::Barrier(_)) => {
1449                let row = table
1450                    .get_row(
1451                        &OwnedRow::new(vec![Some(1_i32.into())]),
1452                        HummockReadEpoch::NoWait(u64::MAX),
1453                    )
1454                    .await
1455                    .unwrap();
1456                assert!(row.is_none());
1457            }
1458            _ => unreachable!(),
1459        }
1460    }
1461
1462    #[tokio::test]
1463    async fn test_check_insert_conflict() {
1464        // Prepare storage and memtable.
1465        let memory_state_store = MemoryStateStore::new();
1466        let table_id = TableId::new(1);
1467        // Two columns of int32 type, the first column is PK.
1468        let schema = Schema::new(vec![
1469            Field::unnamed(DataType::Int32),
1470            Field::unnamed(DataType::Int32),
1471        ]);
1472        let column_ids = vec![0.into(), 1.into()];
1473
1474        // test double insert one pk, the latter needs to override the former.
1475        let chunk1 = StreamChunk::from_pretty(
1476            " i i
1477            + 1 3
1478            + 1 4
1479            + 2 5
1480            + 3 6",
1481        );
1482
1483        let chunk2 = StreamChunk::from_pretty(
1484            " i i
1485            + 1 3
1486            + 2 6",
1487        );
1488
1489        // test delete wrong value, delete inexistent pk
1490        let chunk3 = StreamChunk::from_pretty(
1491            " i i
1492            + 1 4",
1493        );
1494
1495        // Prepare stream executors.
1496        let source = MockSource::with_messages(vec![
1497            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1498            Message::Chunk(chunk1),
1499            Message::Chunk(chunk2),
1500            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1501            Message::Chunk(chunk3),
1502            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1503        ])
1504        .into_executor(schema.clone(), StreamKey::new());
1505
1506        let order_types = vec![OrderType::ascending()];
1507        let column_descs = vec![
1508            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1509            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1510        ];
1511
1512        let table = BatchTable::for_test(
1513            memory_state_store.clone(),
1514            table_id,
1515            column_descs,
1516            order_types,
1517            vec![0],
1518            vec![0, 1],
1519        );
1520
1521        let mut materialize_executor = MaterializeExecutor::for_test(
1522            source,
1523            memory_state_store,
1524            table_id,
1525            vec![ColumnOrder::new(0, OrderType::ascending())],
1526            column_ids,
1527            Arc::new(AtomicU64::new(0)),
1528            ConflictBehavior::Overwrite,
1529        )
1530        .await
1531        .boxed()
1532        .execute();
1533        materialize_executor.next().await.transpose().unwrap();
1534
1535        materialize_executor.next().await.transpose().unwrap();
1536        materialize_executor.next().await.transpose().unwrap();
1537
1538        // First stream chunk. We check the existence of (3) -> (3,6)
1539        match materialize_executor.next().await.transpose().unwrap() {
1540            Some(Message::Barrier(_)) => {
1541                let row = table
1542                    .get_row(
1543                        &OwnedRow::new(vec![Some(3_i32.into())]),
1544                        HummockReadEpoch::NoWait(u64::MAX),
1545                    )
1546                    .await
1547                    .unwrap();
1548                assert_eq!(
1549                    row,
1550                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1551                );
1552
1553                let row = table
1554                    .get_row(
1555                        &OwnedRow::new(vec![Some(1_i32.into())]),
1556                        HummockReadEpoch::NoWait(u64::MAX),
1557                    )
1558                    .await
1559                    .unwrap();
1560                assert_eq!(
1561                    row,
1562                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1563                );
1564
1565                let row = table
1566                    .get_row(
1567                        &OwnedRow::new(vec![Some(2_i32.into())]),
1568                        HummockReadEpoch::NoWait(u64::MAX),
1569                    )
1570                    .await
1571                    .unwrap();
1572                assert_eq!(
1573                    row,
1574                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1575                );
1576            }
1577            _ => unreachable!(),
1578        }
1579    }
1580
1581    #[tokio::test]
1582    async fn test_delete_and_update_conflict() {
1583        // Prepare storage and memtable.
1584        let memory_state_store = MemoryStateStore::new();
1585        let table_id = TableId::new(1);
1586        // Two columns of int32 type, the first column is PK.
1587        let schema = Schema::new(vec![
1588            Field::unnamed(DataType::Int32),
1589            Field::unnamed(DataType::Int32),
1590        ]);
1591        let column_ids = vec![0.into(), 1.into()];
1592
1593        // test double insert one pk, the latter needs to override the former.
1594        let chunk1 = StreamChunk::from_pretty(
1595            " i i
1596            + 1 4
1597            + 2 5
1598            + 3 6
1599            U- 8 1
1600            U+ 8 2
1601            + 8 3",
1602        );
1603
1604        // test delete wrong value, delete inexistent pk
1605        let chunk2 = StreamChunk::from_pretty(
1606            " i i
1607            + 7 8
1608            - 3 4
1609            - 5 0",
1610        );
1611
1612        // test delete wrong value, delete inexistent pk
1613        let chunk3 = StreamChunk::from_pretty(
1614            " i i
1615            + 1 5
1616            U- 2 4
1617            U+ 2 8
1618            U- 9 0
1619            U+ 9 1",
1620        );
1621
1622        // Prepare stream executors.
1623        let source = MockSource::with_messages(vec![
1624            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1625            Message::Chunk(chunk1),
1626            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1627            Message::Chunk(chunk2),
1628            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1629            Message::Chunk(chunk3),
1630            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1631        ])
1632        .into_executor(schema.clone(), StreamKey::new());
1633
1634        let order_types = vec![OrderType::ascending()];
1635        let column_descs = vec![
1636            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1637            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1638        ];
1639
1640        let table = BatchTable::for_test(
1641            memory_state_store.clone(),
1642            table_id,
1643            column_descs,
1644            order_types,
1645            vec![0],
1646            vec![0, 1],
1647        );
1648
1649        let mut materialize_executor = MaterializeExecutor::for_test(
1650            source,
1651            memory_state_store,
1652            table_id,
1653            vec![ColumnOrder::new(0, OrderType::ascending())],
1654            column_ids,
1655            Arc::new(AtomicU64::new(0)),
1656            ConflictBehavior::Overwrite,
1657        )
1658        .await
1659        .boxed()
1660        .execute();
1661        materialize_executor.next().await.transpose().unwrap();
1662
1663        materialize_executor.next().await.transpose().unwrap();
1664
1665        // First stream chunk. We check the existence of (3) -> (3,6)
1666        match materialize_executor.next().await.transpose().unwrap() {
1667            Some(Message::Barrier(_)) => {
1668                // can read (8, 3), check insert after update
1669                let row = table
1670                    .get_row(
1671                        &OwnedRow::new(vec![Some(8_i32.into())]),
1672                        HummockReadEpoch::NoWait(u64::MAX),
1673                    )
1674                    .await
1675                    .unwrap();
1676                assert_eq!(
1677                    row,
1678                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1679                );
1680            }
1681            _ => unreachable!(),
1682        }
1683        materialize_executor.next().await.transpose().unwrap();
1684
1685        match materialize_executor.next().await.transpose().unwrap() {
1686            Some(Message::Barrier(_)) => {
1687                let row = table
1688                    .get_row(
1689                        &OwnedRow::new(vec![Some(7_i32.into())]),
1690                        HummockReadEpoch::NoWait(u64::MAX),
1691                    )
1692                    .await
1693                    .unwrap();
1694                assert_eq!(
1695                    row,
1696                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1697                );
1698
1699                // check delete wrong value
1700                let row = table
1701                    .get_row(
1702                        &OwnedRow::new(vec![Some(3_i32.into())]),
1703                        HummockReadEpoch::NoWait(u64::MAX),
1704                    )
1705                    .await
1706                    .unwrap();
1707                assert_eq!(row, None);
1708
1709                // check delete wrong pk
1710                let row = table
1711                    .get_row(
1712                        &OwnedRow::new(vec![Some(5_i32.into())]),
1713                        HummockReadEpoch::NoWait(u64::MAX),
1714                    )
1715                    .await
1716                    .unwrap();
1717                assert_eq!(row, None);
1718            }
1719            _ => unreachable!(),
1720        }
1721
1722        materialize_executor.next().await.transpose().unwrap();
1723        // Second stream chunk. We check the existence of (7) -> (7,8)
1724        match materialize_executor.next().await.transpose().unwrap() {
1725            Some(Message::Barrier(_)) => {
1726                let row = table
1727                    .get_row(
1728                        &OwnedRow::new(vec![Some(1_i32.into())]),
1729                        HummockReadEpoch::NoWait(u64::MAX),
1730                    )
1731                    .await
1732                    .unwrap();
1733                assert_eq!(
1734                    row,
1735                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1736                );
1737
1738                // check update wrong value
1739                let row = table
1740                    .get_row(
1741                        &OwnedRow::new(vec![Some(2_i32.into())]),
1742                        HummockReadEpoch::NoWait(u64::MAX),
1743                    )
1744                    .await
1745                    .unwrap();
1746                assert_eq!(
1747                    row,
1748                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1749                );
1750
1751                // check update wrong pk, should become insert
1752                let row = table
1753                    .get_row(
1754                        &OwnedRow::new(vec![Some(9_i32.into())]),
1755                        HummockReadEpoch::NoWait(u64::MAX),
1756                    )
1757                    .await
1758                    .unwrap();
1759                assert_eq!(
1760                    row,
1761                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1762                );
1763            }
1764            _ => unreachable!(),
1765        }
1766    }
1767
1768    #[tokio::test]
1769    async fn test_change_buffer_into_chunk_with_stream_key() {
1770        // Table PK is (col0), but stream key is (col0, col1). When the value column changes due
1771        // to overwrite conflict handling, we should output `- old` + `+ new` rather than `U-`/`U+`.
1772        let memory_state_store = MemoryStateStore::new();
1773        let table_id = TableId::new(1);
1774        let schema = Schema::new(vec![
1775            Field::unnamed(DataType::Int32),
1776            Field::unnamed(DataType::Int32),
1777        ]);
1778        let column_ids = vec![0.into(), 1.into()];
1779
1780        let chunk1 = StreamChunk::from_pretty(
1781            " i i
1782            + 1 4",
1783        );
1784        let chunk2 = StreamChunk::from_pretty(
1785            " i i
1786            + 1 5",
1787        );
1788
1789        let source = MockSource::with_messages(vec![
1790            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1791            Message::Chunk(chunk1),
1792            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1793            Message::Chunk(chunk2),
1794            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1795        ])
1796        .into_executor(schema.clone(), StreamKey::new());
1797
1798        let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1799            source,
1800            memory_state_store,
1801            table_id,
1802            vec![ColumnOrder::new(0, OrderType::ascending())],
1803            vec![0, 1],
1804            column_ids,
1805            Arc::new(AtomicU64::new(0)),
1806            ConflictBehavior::Overwrite,
1807        )
1808        .await
1809        .boxed()
1810        .execute();
1811
1812        // init barrier + first insert
1813        materialize_executor.next().await.transpose().unwrap();
1814        materialize_executor.next().await.transpose().unwrap();
1815
1816        // commit barrier
1817        materialize_executor.next().await.transpose().unwrap();
1818
1819        // overwrite conflict should be converted into delete+insert due to stream key mismatch
1820        match materialize_executor.next().await.transpose().unwrap() {
1821            Some(Message::Chunk(chunk)) => {
1822                assert_eq!(
1823                    chunk.compact_vis(),
1824                    StreamChunk::from_pretty(
1825                        " i i
1826                        - 1 4
1827                        + 1 5"
1828                    )
1829                );
1830            }
1831            other => panic!("expect chunk, got {other:?}"),
1832        }
1833    }
1834
1835    #[tokio::test]
1836    async fn test_ignore_insert_conflict() {
1837        // Prepare storage and memtable.
1838        let memory_state_store = MemoryStateStore::new();
1839        let table_id = TableId::new(1);
1840        // Two columns of int32 type, the first column is PK.
1841        let schema = Schema::new(vec![
1842            Field::unnamed(DataType::Int32),
1843            Field::unnamed(DataType::Int32),
1844        ]);
1845        let column_ids = vec![0.into(), 1.into()];
1846
1847        // test double insert one pk, the latter needs to be ignored.
1848        let chunk1 = StreamChunk::from_pretty(
1849            " i i
1850            + 1 3
1851            + 1 4
1852            + 2 5
1853            + 3 6",
1854        );
1855
1856        let chunk2 = StreamChunk::from_pretty(
1857            " i i
1858            + 1 5
1859            + 2 6",
1860        );
1861
1862        // test delete wrong value, delete inexistent pk
1863        let chunk3 = StreamChunk::from_pretty(
1864            " i i
1865            + 1 6",
1866        );
1867
1868        // Prepare stream executors.
1869        let source = MockSource::with_messages(vec![
1870            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1871            Message::Chunk(chunk1),
1872            Message::Chunk(chunk2),
1873            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1874            Message::Chunk(chunk3),
1875            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1876        ])
1877        .into_executor(schema.clone(), StreamKey::new());
1878
1879        let order_types = vec![OrderType::ascending()];
1880        let column_descs = vec![
1881            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1882            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1883        ];
1884
1885        let table = BatchTable::for_test(
1886            memory_state_store.clone(),
1887            table_id,
1888            column_descs,
1889            order_types,
1890            vec![0],
1891            vec![0, 1],
1892        );
1893
1894        let mut materialize_executor = MaterializeExecutor::for_test(
1895            source,
1896            memory_state_store,
1897            table_id,
1898            vec![ColumnOrder::new(0, OrderType::ascending())],
1899            column_ids,
1900            Arc::new(AtomicU64::new(0)),
1901            ConflictBehavior::IgnoreConflict,
1902        )
1903        .await
1904        .boxed()
1905        .execute();
1906        materialize_executor.next().await.transpose().unwrap();
1907
1908        materialize_executor.next().await.transpose().unwrap();
1909        materialize_executor.next().await.transpose().unwrap();
1910
1911        // First stream chunk. We check the existence of (3) -> (3,6)
1912        match materialize_executor.next().await.transpose().unwrap() {
1913            Some(Message::Barrier(_)) => {
1914                let row = table
1915                    .get_row(
1916                        &OwnedRow::new(vec![Some(3_i32.into())]),
1917                        HummockReadEpoch::NoWait(u64::MAX),
1918                    )
1919                    .await
1920                    .unwrap();
1921                assert_eq!(
1922                    row,
1923                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1924                );
1925
1926                let row = table
1927                    .get_row(
1928                        &OwnedRow::new(vec![Some(1_i32.into())]),
1929                        HummockReadEpoch::NoWait(u64::MAX),
1930                    )
1931                    .await
1932                    .unwrap();
1933                assert_eq!(
1934                    row,
1935                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1936                );
1937
1938                let row = table
1939                    .get_row(
1940                        &OwnedRow::new(vec![Some(2_i32.into())]),
1941                        HummockReadEpoch::NoWait(u64::MAX),
1942                    )
1943                    .await
1944                    .unwrap();
1945                assert_eq!(
1946                    row,
1947                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1948                );
1949            }
1950            _ => unreachable!(),
1951        }
1952    }
1953
1954    #[tokio::test]
1955    async fn test_ignore_delete_then_insert() {
1956        // Prepare storage and memtable.
1957        let memory_state_store = MemoryStateStore::new();
1958        let table_id = TableId::new(1);
1959        // Two columns of int32 type, the first column is PK.
1960        let schema = Schema::new(vec![
1961            Field::unnamed(DataType::Int32),
1962            Field::unnamed(DataType::Int32),
1963        ]);
1964        let column_ids = vec![0.into(), 1.into()];
1965
1966        // test insert after delete one pk, the latter insert should succeed.
1967        let chunk1 = StreamChunk::from_pretty(
1968            " i i
1969            + 1 3
1970            - 1 3
1971            + 1 6",
1972        );
1973
1974        // Prepare stream executors.
1975        let source = MockSource::with_messages(vec![
1976            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1977            Message::Chunk(chunk1),
1978            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1979        ])
1980        .into_executor(schema.clone(), StreamKey::new());
1981
1982        let order_types = vec![OrderType::ascending()];
1983        let column_descs = vec![
1984            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1985            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1986        ];
1987
1988        let table = BatchTable::for_test(
1989            memory_state_store.clone(),
1990            table_id,
1991            column_descs,
1992            order_types,
1993            vec![0],
1994            vec![0, 1],
1995        );
1996
1997        let mut materialize_executor = MaterializeExecutor::for_test(
1998            source,
1999            memory_state_store,
2000            table_id,
2001            vec![ColumnOrder::new(0, OrderType::ascending())],
2002            column_ids,
2003            Arc::new(AtomicU64::new(0)),
2004            ConflictBehavior::IgnoreConflict,
2005        )
2006        .await
2007        .boxed()
2008        .execute();
2009        let _msg1 = materialize_executor
2010            .next()
2011            .await
2012            .transpose()
2013            .unwrap()
2014            .unwrap()
2015            .as_barrier()
2016            .unwrap();
2017        let _msg2 = materialize_executor
2018            .next()
2019            .await
2020            .transpose()
2021            .unwrap()
2022            .unwrap()
2023            .as_chunk()
2024            .unwrap();
2025        let _msg3 = materialize_executor
2026            .next()
2027            .await
2028            .transpose()
2029            .unwrap()
2030            .unwrap()
2031            .as_barrier()
2032            .unwrap();
2033
2034        let row = table
2035            .get_row(
2036                &OwnedRow::new(vec![Some(1_i32.into())]),
2037                HummockReadEpoch::NoWait(u64::MAX),
2038            )
2039            .await
2040            .unwrap();
2041        assert_eq!(
2042            row,
2043            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2044        );
2045    }
2046
2047    #[tokio::test]
2048    async fn test_ignore_delete_and_update_conflict() {
2049        // Prepare storage and memtable.
2050        let memory_state_store = MemoryStateStore::new();
2051        let table_id = TableId::new(1);
2052        // Two columns of int32 type, the first column is PK.
2053        let schema = Schema::new(vec![
2054            Field::unnamed(DataType::Int32),
2055            Field::unnamed(DataType::Int32),
2056        ]);
2057        let column_ids = vec![0.into(), 1.into()];
2058
2059        // test double insert one pk, the latter should be ignored.
2060        let chunk1 = StreamChunk::from_pretty(
2061            " i i
2062            + 1 4
2063            + 2 5
2064            + 3 6
2065            U- 8 1
2066            U+ 8 2
2067            + 8 3",
2068        );
2069
2070        // test delete wrong value, delete inexistent pk
2071        let chunk2 = StreamChunk::from_pretty(
2072            " i i
2073            + 7 8
2074            - 3 4
2075            - 5 0",
2076        );
2077
2078        // test delete wrong value, delete inexistent pk
2079        let chunk3 = StreamChunk::from_pretty(
2080            " i i
2081            + 1 5
2082            U- 2 4
2083            U+ 2 8
2084            U- 9 0
2085            U+ 9 1",
2086        );
2087
2088        // Prepare stream executors.
2089        let source = MockSource::with_messages(vec![
2090            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2091            Message::Chunk(chunk1),
2092            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2093            Message::Chunk(chunk2),
2094            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2095            Message::Chunk(chunk3),
2096            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2097        ])
2098        .into_executor(schema.clone(), StreamKey::new());
2099
2100        let order_types = vec![OrderType::ascending()];
2101        let column_descs = vec![
2102            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2103            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2104        ];
2105
2106        let table = BatchTable::for_test(
2107            memory_state_store.clone(),
2108            table_id,
2109            column_descs,
2110            order_types,
2111            vec![0],
2112            vec![0, 1],
2113        );
2114
2115        let mut materialize_executor = MaterializeExecutor::for_test(
2116            source,
2117            memory_state_store,
2118            table_id,
2119            vec![ColumnOrder::new(0, OrderType::ascending())],
2120            column_ids,
2121            Arc::new(AtomicU64::new(0)),
2122            ConflictBehavior::IgnoreConflict,
2123        )
2124        .await
2125        .boxed()
2126        .execute();
2127        materialize_executor.next().await.transpose().unwrap();
2128
2129        materialize_executor.next().await.transpose().unwrap();
2130
2131        // First stream chunk. We check the existence of (3) -> (3,6)
2132        match materialize_executor.next().await.transpose().unwrap() {
2133            Some(Message::Barrier(_)) => {
2134                // can read (8, 2), check insert after update
2135                let row = table
2136                    .get_row(
2137                        &OwnedRow::new(vec![Some(8_i32.into())]),
2138                        HummockReadEpoch::NoWait(u64::MAX),
2139                    )
2140                    .await
2141                    .unwrap();
2142                assert_eq!(
2143                    row,
2144                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2145                );
2146            }
2147            _ => unreachable!(),
2148        }
2149        materialize_executor.next().await.transpose().unwrap();
2150
2151        match materialize_executor.next().await.transpose().unwrap() {
2152            Some(Message::Barrier(_)) => {
2153                let row = table
2154                    .get_row(
2155                        &OwnedRow::new(vec![Some(7_i32.into())]),
2156                        HummockReadEpoch::NoWait(u64::MAX),
2157                    )
2158                    .await
2159                    .unwrap();
2160                assert_eq!(
2161                    row,
2162                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2163                );
2164
2165                // check delete wrong value
2166                let row = table
2167                    .get_row(
2168                        &OwnedRow::new(vec![Some(3_i32.into())]),
2169                        HummockReadEpoch::NoWait(u64::MAX),
2170                    )
2171                    .await
2172                    .unwrap();
2173                assert_eq!(row, None);
2174
2175                // check delete wrong pk
2176                let row = table
2177                    .get_row(
2178                        &OwnedRow::new(vec![Some(5_i32.into())]),
2179                        HummockReadEpoch::NoWait(u64::MAX),
2180                    )
2181                    .await
2182                    .unwrap();
2183                assert_eq!(row, None);
2184            }
2185            _ => unreachable!(),
2186        }
2187
2188        materialize_executor.next().await.transpose().unwrap();
2189        // materialize_executor.next().await.transpose().unwrap();
2190        // Second stream chunk. We check the existence of (7) -> (7,8)
2191        match materialize_executor.next().await.transpose().unwrap() {
2192            Some(Message::Barrier(_)) => {
2193                let row = table
2194                    .get_row(
2195                        &OwnedRow::new(vec![Some(1_i32.into())]),
2196                        HummockReadEpoch::NoWait(u64::MAX),
2197                    )
2198                    .await
2199                    .unwrap();
2200                assert_eq!(
2201                    row,
2202                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2203                );
2204
2205                // check update wrong value
2206                let row = table
2207                    .get_row(
2208                        &OwnedRow::new(vec![Some(2_i32.into())]),
2209                        HummockReadEpoch::NoWait(u64::MAX),
2210                    )
2211                    .await
2212                    .unwrap();
2213                assert_eq!(
2214                    row,
2215                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2216                );
2217
2218                // check update wrong pk, should become insert
2219                let row = table
2220                    .get_row(
2221                        &OwnedRow::new(vec![Some(9_i32.into())]),
2222                        HummockReadEpoch::NoWait(u64::MAX),
2223                    )
2224                    .await
2225                    .unwrap();
2226                assert_eq!(
2227                    row,
2228                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2229                );
2230            }
2231            _ => unreachable!(),
2232        }
2233    }
2234
2235    #[tokio::test]
2236    async fn test_do_update_if_not_null_conflict() {
2237        // Prepare storage and memtable.
2238        let memory_state_store = MemoryStateStore::new();
2239        let table_id = TableId::new(1);
2240        // Two columns of int32 type, the first column is PK.
2241        let schema = Schema::new(vec![
2242            Field::unnamed(DataType::Int32),
2243            Field::unnamed(DataType::Int32),
2244        ]);
2245        let column_ids = vec![0.into(), 1.into()];
2246
2247        // should get (8, 2)
2248        let chunk1 = StreamChunk::from_pretty(
2249            " i i
2250            + 1 4
2251            + 2 .
2252            + 3 6
2253            U- 8 .
2254            U+ 8 2
2255            + 8 .",
2256        );
2257
2258        // should not get (3, x), should not get (5, 0)
2259        let chunk2 = StreamChunk::from_pretty(
2260            " i i
2261            + 7 8
2262            - 3 4
2263            - 5 0",
2264        );
2265
2266        // should get (2, None), (7, 8)
2267        let chunk3 = StreamChunk::from_pretty(
2268            " i i
2269            + 1 5
2270            + 7 .
2271            U- 2 4
2272            U+ 2 .
2273            U- 9 0
2274            U+ 9 1",
2275        );
2276
2277        // Prepare stream executors.
2278        let source = MockSource::with_messages(vec![
2279            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2280            Message::Chunk(chunk1),
2281            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2282            Message::Chunk(chunk2),
2283            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2284            Message::Chunk(chunk3),
2285            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2286        ])
2287        .into_executor(schema.clone(), StreamKey::new());
2288
2289        let order_types = vec![OrderType::ascending()];
2290        let column_descs = vec![
2291            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2292            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2293        ];
2294
2295        let table = BatchTable::for_test(
2296            memory_state_store.clone(),
2297            table_id,
2298            column_descs,
2299            order_types,
2300            vec![0],
2301            vec![0, 1],
2302        );
2303
2304        let mut materialize_executor = MaterializeExecutor::for_test(
2305            source,
2306            memory_state_store,
2307            table_id,
2308            vec![ColumnOrder::new(0, OrderType::ascending())],
2309            column_ids,
2310            Arc::new(AtomicU64::new(0)),
2311            ConflictBehavior::DoUpdateIfNotNull,
2312        )
2313        .await
2314        .boxed()
2315        .execute();
2316        materialize_executor.next().await.transpose().unwrap();
2317
2318        materialize_executor.next().await.transpose().unwrap();
2319
2320        // First stream chunk. We check the existence of (3) -> (3,6)
2321        match materialize_executor.next().await.transpose().unwrap() {
2322            Some(Message::Barrier(_)) => {
2323                let row = table
2324                    .get_row(
2325                        &OwnedRow::new(vec![Some(8_i32.into())]),
2326                        HummockReadEpoch::NoWait(u64::MAX),
2327                    )
2328                    .await
2329                    .unwrap();
2330                assert_eq!(
2331                    row,
2332                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2333                );
2334
2335                let row = table
2336                    .get_row(
2337                        &OwnedRow::new(vec![Some(2_i32.into())]),
2338                        HummockReadEpoch::NoWait(u64::MAX),
2339                    )
2340                    .await
2341                    .unwrap();
2342                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2343            }
2344            _ => unreachable!(),
2345        }
2346        materialize_executor.next().await.transpose().unwrap();
2347
2348        match materialize_executor.next().await.transpose().unwrap() {
2349            Some(Message::Barrier(_)) => {
2350                let row = table
2351                    .get_row(
2352                        &OwnedRow::new(vec![Some(7_i32.into())]),
2353                        HummockReadEpoch::NoWait(u64::MAX),
2354                    )
2355                    .await
2356                    .unwrap();
2357                assert_eq!(
2358                    row,
2359                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2360                );
2361
2362                // check delete wrong value
2363                let row = table
2364                    .get_row(
2365                        &OwnedRow::new(vec![Some(3_i32.into())]),
2366                        HummockReadEpoch::NoWait(u64::MAX),
2367                    )
2368                    .await
2369                    .unwrap();
2370                assert_eq!(row, None);
2371
2372                // check delete wrong pk
2373                let row = table
2374                    .get_row(
2375                        &OwnedRow::new(vec![Some(5_i32.into())]),
2376                        HummockReadEpoch::NoWait(u64::MAX),
2377                    )
2378                    .await
2379                    .unwrap();
2380                assert_eq!(row, None);
2381            }
2382            _ => unreachable!(),
2383        }
2384
2385        materialize_executor.next().await.transpose().unwrap();
2386        // materialize_executor.next().await.transpose().unwrap();
2387        // Second stream chunk. We check the existence of (7) -> (7,8)
2388        match materialize_executor.next().await.transpose().unwrap() {
2389            Some(Message::Barrier(_)) => {
2390                let row = table
2391                    .get_row(
2392                        &OwnedRow::new(vec![Some(7_i32.into())]),
2393                        HummockReadEpoch::NoWait(u64::MAX),
2394                    )
2395                    .await
2396                    .unwrap();
2397                assert_eq!(
2398                    row,
2399                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2400                );
2401
2402                // check update wrong value
2403                let row = table
2404                    .get_row(
2405                        &OwnedRow::new(vec![Some(2_i32.into())]),
2406                        HummockReadEpoch::NoWait(u64::MAX),
2407                    )
2408                    .await
2409                    .unwrap();
2410                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2411
2412                // check update wrong pk, should become insert
2413                let row = table
2414                    .get_row(
2415                        &OwnedRow::new(vec![Some(9_i32.into())]),
2416                        HummockReadEpoch::NoWait(u64::MAX),
2417                    )
2418                    .await
2419                    .unwrap();
2420                assert_eq!(
2421                    row,
2422                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2423                );
2424            }
2425            _ => unreachable!(),
2426        }
2427    }
2428
2429    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2430        const KN: u32 = 4;
2431        const SEED: u64 = 998244353;
2432        let mut ret = vec![];
2433        let mut builder =
2434            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2435        let mut rng = SmallRng::seed_from_u64(SEED);
2436
2437        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2438            let len = c.data_chunk().capacity();
2439            let mut c = StreamChunkMut::from(c);
2440            for i in 0..len {
2441                c.set_vis(i, rng.random_bool(0.5));
2442            }
2443            c.into()
2444        };
2445        for _ in 0..row_number {
2446            let k = (rng.next_u32() % KN) as i32;
2447            let v = rng.next_u32() as i32;
2448            let op = if rng.random_bool(0.5) {
2449                Op::Insert
2450            } else {
2451                Op::Delete
2452            };
2453            if let Some(c) =
2454                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2455            {
2456                ret.push(random_vis(c, &mut rng));
2457            }
2458        }
2459        if let Some(c) = builder.take() {
2460            ret.push(random_vis(c, &mut rng));
2461        }
2462        ret
2463    }
2464
2465    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2466        const N: usize = 100000;
2467
2468        // Prepare storage and memtable.
2469        let memory_state_store = MemoryStateStore::new();
2470        let table_id = TableId::new(1);
2471        // Two columns of int32 type, the first column is PK.
2472        let schema = Schema::new(vec![
2473            Field::unnamed(DataType::Int32),
2474            Field::unnamed(DataType::Int32),
2475        ]);
2476        let column_ids = vec![0.into(), 1.into()];
2477
2478        let chunks = gen_fuzz_data(N, 128);
2479        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2480            .chain(chunks.into_iter().map(Message::Chunk))
2481            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2482                test_epoch(2),
2483            ))))
2484            .collect();
2485        // Prepare stream executors.
2486        let source =
2487            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2488
2489        let mut materialize_executor = MaterializeExecutor::for_test(
2490            source,
2491            memory_state_store.clone(),
2492            table_id,
2493            vec![ColumnOrder::new(0, OrderType::ascending())],
2494            column_ids,
2495            Arc::new(AtomicU64::new(0)),
2496            conflict_behavior,
2497        )
2498        .await
2499        .boxed()
2500        .execute();
2501        materialize_executor.expect_barrier().await;
2502
2503        let order_types = vec![OrderType::ascending()];
2504        let column_descs = vec![
2505            ColumnDesc::unnamed(0.into(), DataType::Int32),
2506            ColumnDesc::unnamed(1.into(), DataType::Int32),
2507        ];
2508        let pk_indices = vec![0];
2509
2510        let mut table = StateTable::from_table_catalog(
2511            &crate::common::table::test_utils::gen_pbtable(
2512                TableId::from(1002),
2513                column_descs.clone(),
2514                order_types,
2515                pk_indices,
2516                0,
2517            ),
2518            memory_state_store.clone(),
2519            None,
2520        )
2521        .await;
2522
2523        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2524            // check with state table's memtable
2525            table.write_chunk(c);
2526        }
2527    }
2528
2529    #[tokio::test]
2530    async fn fuzz_test_stream_consistent_upsert() {
2531        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2532    }
2533
2534    #[tokio::test]
2535    async fn fuzz_test_stream_consistent_ignore() {
2536        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2537    }
2538}