risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57    NormalIngestion,
58    MergingData,
59    CleanUp,
60    CommitAndYieldBarrier {
61        barrier: BarrierInner<M>,
62        expect_next_state: Box<MaterializeStreamState<M>>,
63    },
64    RefreshEnd {
65        on_complete_epoch: EpochPair,
66    },
67}
68
69/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
70pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71    input: Executor,
72
73    schema: Schema,
74
75    state_table: StateTableInner<S, SD>,
76
77    /// Stream key indices of the *output* of this materialize node.
78    ///
79    /// This can be different from the table PK. Typically, there are 3 cases:
80    ///
81    /// - Normal `TABLE`: `stream_key == pk`, so no special handling is needed.
82    /// - TTL-ed `TABLE` (`WITH TTL`): `stream_key` is a superset of `pk` by appending the TTL
83    ///   watermark column. In this case, updates to the same pk may have different stream keys, so
84    ///   we must rewrite such `Update` into `Delete + Insert` before yielding.
85    /// - `MV` or `INDEX`: `pk` can be a superset of `stream_key` to also include the order key or
86    ///   distribution key specified by the user. No special handling is needed either.
87    stream_key_indices: Vec<usize>,
88
89    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
90    arrange_key_indices: Vec<usize>,
91
92    actor_context: ActorContextRef,
93
94    /// The cache for conflict handling. `None` if conflict behavior is `NoCheck`.
95    materialize_cache: Option<MaterializeCache>,
96
97    conflict_behavior: ConflictBehavior,
98
99    /// Whether the table can clean itself by TTL watermark, i.e., is defined with `WATERMARK ... WITH TTL`.
100    cleaned_by_ttl_watermark: bool,
101
102    version_column_indices: Vec<u32>,
103
104    may_have_downstream: bool,
105
106    subscriber_ids: HashSet<SubscriberId>,
107
108    metrics: MaterializeMetrics,
109
110    /// No data will be written to hummock table. This Materialize is just a dummy node.
111    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
112    is_dummy_table: bool,
113
114    /// Optional refresh arguments and state for refreshable materialized views
115    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
116
117    /// Local barrier manager for reporting barrier events
118    local_barrier_manager: LocalBarrierManager,
119}
120
121/// Arguments and state for refreshable materialized views
122pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
123    /// Table catalog for main table
124    pub table_catalog: Table,
125
126    /// Table catalog for staging table
127    pub staging_table_catalog: Table,
128
129    /// Flag indicating if this table is currently being refreshed
130    pub is_refreshing: bool,
131
132    /// During data refresh (between `RefreshStart` and `LoadFinish`),
133    /// data will be written to both the main table and the staging table.
134    ///
135    /// The staging table is PK-only.
136    ///
137    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
138    pub staging_table: StateTableInner<S, SD>,
139
140    /// Progress table for tracking refresh state per `VNode` for fault tolerance
141    pub progress_table: RefreshProgressTable<S>,
142
143    /// Table ID for this refreshable materialized view
144    pub table_id: TableId,
145}
146
147impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
148    /// Create new `RefreshableMaterializeArgs`
149    pub async fn new(
150        store: S,
151        table_catalog: &Table,
152        staging_table_catalog: &Table,
153        progress_state_table: &Table,
154        vnodes: Option<Arc<Bitmap>>,
155    ) -> Self {
156        let table_id = table_catalog.id;
157
158        // staging table is pk-only, and we don't need to check value consistency
159        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
160            staging_table_catalog,
161            store.clone(),
162            vnodes.clone(),
163        )
164        .await;
165
166        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
167            progress_state_table,
168            store,
169            vnodes,
170        )
171        .await;
172
173        // Get primary key length from main table catalog
174        let pk_len = table_catalog.pk.len();
175        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
176
177        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
178
179        Self {
180            table_catalog: table_catalog.clone(),
181            staging_table_catalog: staging_table_catalog.clone(),
182            is_refreshing: false,
183            staging_table,
184            progress_table,
185            table_id,
186        }
187    }
188}
189
190fn get_op_consistency_level(
191    cleaned_by_ttl_watermark: bool,
192    conflict_behavior: ConflictBehavior,
193    may_have_downstream: bool,
194    subscriber_ids: &HashSet<SubscriberId>,
195) -> StateTableOpConsistencyLevel {
196    if cleaned_by_ttl_watermark {
197        // For tables with watermark TTL. Due to async state cleaning, it's uncertain whether an expired
198        // key has been cleaned up by the storage or not, thus we are not sure if to write `Update` or `Insert`
199        // if the key appears again.
200        StateTableOpConsistencyLevel::Inconsistent
201    } else if !subscriber_ids.is_empty() {
202        StateTableOpConsistencyLevel::LogStoreEnabled
203    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
204        // Table with overwrite conflict behavior could disable conflict check
205        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
206        StateTableOpConsistencyLevel::Inconsistent
207    } else {
208        StateTableOpConsistencyLevel::ConsistentOldValue
209    }
210}
211
212impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
213    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
214    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
215    /// should be `None`.
216    #[allow(clippy::too_many_arguments)]
217    pub async fn new(
218        input: Executor,
219        schema: Schema,
220        store: S,
221        arrange_key: Vec<ColumnOrder>,
222        actor_context: ActorContextRef,
223        vnodes: Option<Arc<Bitmap>>,
224        table_catalog: &Table,
225        watermark_epoch: AtomicU64Ref,
226        conflict_behavior: ConflictBehavior,
227        version_column_indices: Vec<u32>,
228        metrics: Arc<StreamingMetrics>,
229        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
230        cleaned_by_ttl_watermark: bool,
231        local_barrier_manager: LocalBarrierManager,
232    ) -> Self {
233        let table_columns: Vec<ColumnDesc> = table_catalog
234            .columns
235            .iter()
236            .map(|col| col.column_desc.as_ref().unwrap().into())
237            .collect();
238
239        // Extract TOAST-able column indices from table columns.
240        // Only for PostgreSQL CDC tables.
241        let toastable_column_indices = if table_catalog.cdc_table_type()
242            == risingwave_pb::catalog::table::CdcTableType::Postgres
243        {
244            let toastable_indices: Vec<usize> = table_columns
245                .iter()
246                .enumerate()
247                .filter_map(|(index, column)| match &column.data_type {
248                    // Currently supports TOAST updates for:
249                    // - jsonb (DataType::Jsonb)
250                    // - varchar (DataType::Varchar)
251                    // - bytea (DataType::Bytea)
252                    // - One-dimensional arrays of the above types (DataType::List)
253                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
254
255                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
256                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
257                        Some(index)
258                    }
259                    _ => None,
260                })
261                .collect();
262
263            if toastable_indices.is_empty() {
264                None
265            } else {
266                Some(toastable_indices)
267            }
268        } else {
269            None
270        };
271
272        let row_serde: BasicSerde = BasicSerde::new(
273            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
274            Arc::from(table_columns.into_boxed_slice()),
275        );
276
277        let stream_key_indices: Vec<usize> = table_catalog
278            .stream_key
279            .iter()
280            .map(|idx| *idx as usize)
281            .collect();
282        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
283        let may_have_downstream = actor_context.initial_dispatch_num != 0;
284        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
285        let op_consistency_level = get_op_consistency_level(
286            cleaned_by_ttl_watermark,
287            conflict_behavior,
288            may_have_downstream,
289            &subscriber_ids,
290        );
291        let state_table_metrics = metrics.new_state_table_metrics(
292            table_catalog.id,
293            actor_context.id,
294            actor_context.fragment_id,
295        );
296        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
297        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
298            .with_op_consistency_level(op_consistency_level)
299            .enable_preload_all_rows_by_config(&actor_context.config)
300            .enable_vnode_key_pruning(true)
301            .with_metrics(state_table_metrics)
302            .build()
303            .await;
304
305        let mv_metrics = metrics.new_materialize_metrics(
306            table_catalog.id,
307            actor_context.id,
308            actor_context.fragment_id,
309        );
310        let cache_metrics = metrics.new_materialize_cache_metrics(
311            table_catalog.id,
312            actor_context.id,
313            actor_context.fragment_id,
314        );
315
316        let metrics_info =
317            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
318
319        let is_dummy_table =
320            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
321
322        Self {
323            input,
324            schema,
325            state_table,
326            stream_key_indices,
327            arrange_key_indices,
328            actor_context,
329            materialize_cache: MaterializeCache::new(
330                watermark_epoch,
331                metrics_info,
332                row_serde,
333                version_column_indices.clone(),
334                conflict_behavior,
335                toastable_column_indices,
336                cache_metrics,
337            ),
338            conflict_behavior,
339            cleaned_by_ttl_watermark,
340            version_column_indices,
341            is_dummy_table,
342            may_have_downstream,
343            subscriber_ids,
344            metrics: mv_metrics,
345            refresh_args,
346            local_barrier_manager,
347        }
348    }
349
350    #[try_stream(ok = Message, error = StreamExecutorError)]
351    async fn execute_inner(mut self) {
352        let mv_table_id = self.state_table.table_id();
353        let data_types = self.schema.data_types();
354        let mut input = self.input.execute();
355
356        let barrier = expect_first_barrier(&mut input).await?;
357        let first_epoch = barrier.epoch;
358        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
359        // The first barrier message should be propagated.
360        yield Message::Barrier(barrier);
361        self.state_table.init_epoch(first_epoch).await?;
362
363        // default to normal ingestion
364        let mut inner_state =
365            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
366        // Initialize staging table for refreshable materialized views
367        if let Some(ref mut refresh_args) = self.refresh_args {
368            refresh_args.staging_table.init_epoch(first_epoch).await?;
369
370            // Initialize progress table and load existing progress for recovery
371            refresh_args.progress_table.recover(first_epoch).await?;
372
373            // Check if refresh is already in progress (recovery scenario)
374            let progress_stats = refresh_args.progress_table.get_progress_stats();
375            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
376                refresh_args.is_refreshing = true;
377                tracing::info!(
378                    total_vnodes = progress_stats.total_vnodes,
379                    completed_vnodes = progress_stats.completed_vnodes,
380                    "Recovered refresh in progress, resuming refresh operation"
381                );
382
383                // Since stage info is no longer stored in progress table,
384                // we need to determine recovery state differently.
385                // For now, assume all incomplete VNodes need to continue merging
386                let incomplete_vnodes: Vec<_> = refresh_args
387                    .progress_table
388                    .get_all_progress()
389                    .iter()
390                    .filter(|(_, entry)| !entry.is_completed)
391                    .map(|(&vnode, _)| vnode)
392                    .collect();
393
394                if !incomplete_vnodes.is_empty() {
395                    // Some VNodes are incomplete, need to resume refresh operation
396                    tracing::info!(
397                        incomplete_vnodes = incomplete_vnodes.len(),
398                        "Recovery detected incomplete VNodes, resuming refresh operation"
399                    );
400                    // Since stage tracking is now in memory, we'll determine the appropriate
401                    // stage based on the executor's internal state machine
402                } else {
403                    // This should not happen if is_complete() returned false, but handle it gracefully
404                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
405                }
406            }
407        }
408
409        // Determine initial execution stage (for recovery scenarios)
410        if let Some(ref refresh_args) = self.refresh_args
411            && refresh_args.is_refreshing
412        {
413            // Recovery logic: Check if there are incomplete vnodes from previous run
414            let incomplete_vnodes: Vec<_> = refresh_args
415                .progress_table
416                .get_all_progress()
417                .iter()
418                .filter(|(_, entry)| !entry.is_completed)
419                .map(|(&vnode, _)| vnode)
420                .collect();
421            if !incomplete_vnodes.is_empty() {
422                // Resume from merge stage since some VNodes were left incomplete
423                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
424                tracing::info!(
425                    incomplete_vnodes = incomplete_vnodes.len(),
426                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
427                );
428            }
429        }
430
431        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
432        'main_loop: loop {
433            match *inner_state {
434                MaterializeStreamState::NormalIngestion => {
435                    #[for_await]
436                    '_normal_ingest: for msg in input.by_ref() {
437                        let msg = msg?;
438                        if let Some(cache) = &mut self.materialize_cache {
439                            cache.evict();
440                        }
441
442                        match msg {
443                            Message::Watermark(w) => {
444                                if self.cleaned_by_ttl_watermark
445                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
446                                {
447                                    self.state_table.update_watermark(w.val.clone());
448                                }
449                                yield Message::Watermark(w);
450                            }
451                            Message::Chunk(chunk) if self.is_dummy_table => {
452                                self.metrics
453                                    .materialize_input_row_count
454                                    .inc_by(chunk.cardinality() as u64);
455                                yield Message::Chunk(chunk);
456                            }
457                            Message::Chunk(chunk) => {
458                                self.metrics
459                                    .materialize_input_row_count
460                                    .inc_by(chunk.cardinality() as u64);
461
462                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
463                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
464                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
465                                // while outputting inconsistent changes to downstream which no one will subscribe to.
466                                // For tables with watermark TTL (indicated by `cleaned_by_ttl_watermark`), conflict check must be enabled.
467                                // TODO(ttl): differentiate between table consistency and downstream consistency.
468                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
469                                    self.conflict_behavior
470                                    && !self.state_table.is_consistent_op()
471                                    && !self.cleaned_by_ttl_watermark
472                                    && self.version_column_indices.is_empty()
473                                {
474                                    ConflictBehavior::NoCheck
475                                } else {
476                                    self.conflict_behavior
477                                };
478
479                                match optimized_conflict_behavior {
480                                    checked_conflict_behaviors!() => {
481                                        if chunk.cardinality() == 0 {
482                                            // empty chunk
483                                            continue;
484                                        }
485
486                                        // For refreshable materialized views, write to staging table during refresh
487                                        // Do not use generate_output here.
488                                        if let Some(ref mut refresh_args) = self.refresh_args
489                                            && refresh_args.is_refreshing
490                                        {
491                                            let key_chunk = chunk
492                                                .clone()
493                                                .project(self.state_table.pk_indices());
494                                            tracing::trace!(
495                                                staging_chunk = %key_chunk.to_pretty(),
496                                                input_chunk = %chunk.to_pretty(),
497                                                "writing to staging table"
498                                            );
499                                            if cfg!(debug_assertions) {
500                                                // refreshable source should be append-only
501                                                assert!(
502                                                    key_chunk
503                                                        .ops()
504                                                        .iter()
505                                                        .all(|op| op == &Op::Insert)
506                                                );
507                                            }
508                                            refresh_args
509                                                .staging_table
510                                                .write_chunk(key_chunk.clone());
511                                            refresh_args.staging_table.try_flush().await?;
512                                        }
513
514                                        let cache = self.materialize_cache.as_mut().unwrap();
515                                        let change_buffer =
516                                            cache.handle_new(chunk, &self.state_table).await?;
517
518                                        let output_chunk = if self.stream_key_indices
519                                            == self.state_table.pk_indices()
520                                        {
521                                            change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
522                                                data_types.clone(),
523                                            )
524                                        } else {
525                                            // We only hit this branch for TTL-ed tables for now.
526                                            // Assert stream key is a superset of pk.
527                                            debug_assert!(
528                                                self.state_table
529                                                    .pk_indices()
530                                                    .iter()
531                                                    .all(|&i| self.stream_key_indices.contains(&i))
532                                            );
533                                            change_buffer.into_chunk_with_key(
534                                                data_types.clone(),
535                                                &self.stream_key_indices,
536                                            )
537                                        };
538
539                                        match output_chunk {
540                                            Some(output_chunk) => {
541                                                self.state_table.write_chunk(output_chunk.clone());
542                                                self.state_table.try_flush().await?;
543                                                yield Message::Chunk(output_chunk);
544                                            }
545                                            None => continue,
546                                        }
547                                    }
548                                    ConflictBehavior::NoCheck => {
549                                        self.state_table.write_chunk(chunk.clone());
550                                        self.state_table.try_flush().await?;
551
552                                        // For refreshable materialized views, also write to staging table during refresh
553                                        if let Some(ref mut refresh_args) = self.refresh_args
554                                            && refresh_args.is_refreshing
555                                        {
556                                            let key_chunk = chunk
557                                                .clone()
558                                                .project(self.state_table.pk_indices());
559                                            tracing::trace!(
560                                                staging_chunk = %key_chunk.to_pretty(),
561                                                input_chunk = %chunk.to_pretty(),
562                                                "writing to staging table"
563                                            );
564                                            if cfg!(debug_assertions) {
565                                                // refreshable source should be append-only
566                                                assert!(
567                                                    key_chunk
568                                                        .ops()
569                                                        .iter()
570                                                        .all(|op| op == &Op::Insert)
571                                                );
572                                            }
573                                            refresh_args
574                                                .staging_table
575                                                .write_chunk(key_chunk.clone());
576                                            refresh_args.staging_table.try_flush().await?;
577                                        }
578
579                                        yield Message::Chunk(chunk);
580                                    }
581                                }
582                            }
583                            Message::Barrier(barrier) => {
584                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
585                                    barrier,
586                                    expect_next_state: Box::new(
587                                        MaterializeStreamState::NormalIngestion,
588                                    ),
589                                };
590                                continue 'main_loop;
591                            }
592                        }
593                    }
594
595                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
596                        anyhow::anyhow!(
597                            "Input stream terminated unexpectedly during normal ingestion"
598                        ),
599                    )));
600                }
601                MaterializeStreamState::MergingData => {
602                    let Some(refresh_args) = self.refresh_args.as_mut() else {
603                        panic!(
604                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
605                        );
606                    };
607                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
608
609                    debug_assert_eq!(
610                        self.state_table.vnodes(),
611                        refresh_args.staging_table.vnodes()
612                    );
613                    debug_assert_eq!(
614                        refresh_args.staging_table.vnodes(),
615                        refresh_args.progress_table.vnodes()
616                    );
617
618                    let mut rows_to_delete = vec![];
619                    let mut merge_complete = false;
620                    let mut pending_barrier: Option<Barrier> = None;
621
622                    // Scope to limit immutable borrows to state tables
623                    {
624                        let left_input = input.by_ref().map(Either::Left);
625                        let right_merge_sort = pin!(
626                            Self::make_mergesort_stream(
627                                &self.state_table,
628                                &refresh_args.staging_table,
629                                &mut refresh_args.progress_table
630                            )
631                            .map(Either::Right)
632                        );
633
634                        // Prefer to select input stream to handle barriers promptly
635                        // Rebuild the merge stream each time processing a barrier
636                        let mut merge_stream =
637                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
638                                stream::PollNext::Left
639                            });
640
641                        #[for_await]
642                        'merge_stream: for either in &mut merge_stream {
643                            match either {
644                                Either::Left(msg) => {
645                                    let msg = msg?;
646                                    match msg {
647                                        Message::Watermark(w) => yield Message::Watermark(w),
648                                        Message::Chunk(chunk) => {
649                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
650                                        }
651                                        Message::Barrier(b) => {
652                                            pending_barrier = Some(b);
653                                            break 'merge_stream;
654                                        }
655                                    }
656                                }
657                                Either::Right(result) => {
658                                    match result? {
659                                        Some((_vnode, row)) => {
660                                            rows_to_delete.push(row);
661                                        }
662                                        None => {
663                                            // Merge stream finished
664                                            merge_complete = true;
665
666                                            // If the merge stream finished, we need to wait for the next barrier to commit states
667                                        }
668                                    }
669                                }
670                            }
671                        }
672                    }
673
674                    // Process collected rows for deletion
675                    for row in &rows_to_delete {
676                        self.state_table.delete(row);
677                    }
678                    if !rows_to_delete.is_empty() {
679                        let to_delete_chunk = StreamChunk::from_rows(
680                            &rows_to_delete
681                                .iter()
682                                .map(|row| (Op::Delete, row))
683                                .collect_vec(),
684                            &self.schema.data_types(),
685                        );
686
687                        yield Message::Chunk(to_delete_chunk);
688                    }
689
690                    // should wait for at least one barrier
691                    assert!(pending_barrier.is_some(), "pending barrier is not set");
692
693                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
694                        barrier: pending_barrier.unwrap(),
695                        expect_next_state: if merge_complete {
696                            Box::new(MaterializeStreamState::CleanUp)
697                        } else {
698                            Box::new(MaterializeStreamState::MergingData)
699                        },
700                    };
701                    continue 'main_loop;
702                }
703                MaterializeStreamState::CleanUp => {
704                    let Some(refresh_args) = self.refresh_args.as_mut() else {
705                        panic!(
706                            "MaterializeExecutor entered MergingData state without refresh_args configured"
707                        );
708                    };
709                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
710
711                    #[for_await]
712                    for msg in input.by_ref() {
713                        let msg = msg?;
714                        match msg {
715                            Message::Watermark(w) => {
716                                if self.cleaned_by_ttl_watermark
717                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
718                                {
719                                    self.state_table.update_watermark(w.val.clone());
720                                }
721                                yield Message::Watermark(w)
722                            }
723                            Message::Chunk(chunk) => {
724                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
725                            }
726                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
727                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
728                                    barrier,
729                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
730                                };
731                                continue 'main_loop;
732                            }
733                            Message::Barrier(barrier) => {
734                                let staging_table_id = refresh_args.staging_table.table_id();
735                                let epoch = barrier.epoch;
736                                self.local_barrier_manager.report_refresh_finished(
737                                    epoch,
738                                    self.actor_context.id,
739                                    refresh_args.table_id,
740                                    staging_table_id,
741                                );
742                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
743
744                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
745                                    barrier,
746                                    expect_next_state: Box::new(
747                                        MaterializeStreamState::RefreshEnd {
748                                            on_complete_epoch: epoch,
749                                        },
750                                    ),
751                                };
752                                continue 'main_loop;
753                            }
754                        }
755                    }
756                }
757                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
758                    let Some(refresh_args) = self.refresh_args.as_mut() else {
759                        panic!(
760                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
761                        );
762                    };
763                    let staging_table_id = refresh_args.staging_table.table_id();
764
765                    // Wait for staging table truncation to complete
766                    let staging_store = refresh_args.staging_table.state_store().clone();
767                    staging_store
768                        .try_wait_epoch(
769                            HummockReadEpoch::Committed(on_complete_epoch.prev),
770                            TryWaitEpochOptions {
771                                table_id: staging_table_id,
772                            },
773                        )
774                        .await?;
775
776                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
777
778                    if let Some(ref mut refresh_args) = self.refresh_args {
779                        refresh_args.is_refreshing = false;
780                    }
781                    *inner_state = MaterializeStreamState::NormalIngestion;
782                    continue 'main_loop;
783                }
784                MaterializeStreamState::CommitAndYieldBarrier {
785                    barrier,
786                    mut expect_next_state,
787                } => {
788                    if let Some(ref mut refresh_args) = self.refresh_args {
789                        match barrier.mutation.as_deref() {
790                            Some(Mutation::RefreshStart {
791                                table_id: refresh_table_id,
792                                associated_source_id: _,
793                            }) if *refresh_table_id == refresh_args.table_id => {
794                                debug_assert!(
795                                    !refresh_args.is_refreshing,
796                                    "cannot start refresh twice"
797                                );
798                                refresh_args.is_refreshing = true;
799                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
800
801                                // Initialize progress tracking for all VNodes
802                                Self::init_refresh_progress(
803                                    &self.state_table,
804                                    &mut refresh_args.progress_table,
805                                    barrier.epoch.curr,
806                                )?;
807                            }
808                            Some(Mutation::LoadFinish {
809                                associated_source_id: load_finish_source_id,
810                            }) => {
811                                // Get associated source id from table catalog
812                                let associated_source_id: SourceId = match refresh_args
813                                    .table_catalog
814                                    .optional_associated_source_id
815                                {
816                                    Some(id) => id.into(),
817                                    None => unreachable!("associated_source_id is not set"),
818                                };
819
820                                if *load_finish_source_id == associated_source_id {
821                                    tracing::info!(
822                                        %load_finish_source_id,
823                                        "LoadFinish received, starting data replacement"
824                                    );
825                                    expect_next_state =
826                                        Box::new(MaterializeStreamState::<_>::MergingData);
827                                }
828                            }
829                            _ => {}
830                        }
831                    }
832
833                    // ===== normal operation =====
834
835                    // If a downstream mv depends on the current table, we need to do conflict check again.
836                    if !self.may_have_downstream
837                        && barrier.has_more_downstream_fragments(self.actor_context.id)
838                    {
839                        self.may_have_downstream = true;
840                    }
841                    Self::may_update_depended_subscriptions(
842                        &mut self.subscriber_ids,
843                        &barrier,
844                        mv_table_id,
845                    );
846                    let op_consistency_level = get_op_consistency_level(
847                        self.cleaned_by_ttl_watermark,
848                        self.conflict_behavior,
849                        self.may_have_downstream,
850                        &self.subscriber_ids,
851                    );
852                    let post_commit = self
853                        .state_table
854                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
855                        .await?;
856
857                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
858
859                    // Commit staging table for refreshable materialized views
860                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
861                    {
862                        // Commit progress table for fault tolerance
863
864                        Some((
865                            refresh_args.staging_table.commit(barrier.epoch).await?,
866                            refresh_args.progress_table.commit(barrier.epoch).await?,
867                        ))
868                    } else {
869                        None
870                    };
871
872                    let b_epoch = barrier.epoch;
873                    yield Message::Barrier(barrier);
874
875                    // Update the vnode bitmap for the state table if asked.
876                    if let Some((_, cache_may_stale)) = post_commit
877                        .post_yield_barrier(update_vnode_bitmap.clone())
878                        .await?
879                        && cache_may_stale
880                        && let Some(cache) = &mut self.materialize_cache
881                    {
882                        cache.clear();
883                    }
884
885                    // Handle staging table post commit
886                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
887                        staging_post_commit
888                            .post_yield_barrier(update_vnode_bitmap.clone())
889                            .await?;
890                        progress_post_commit
891                            .post_yield_barrier(update_vnode_bitmap)
892                            .await?;
893                    }
894
895                    self.metrics
896                        .materialize_current_epoch
897                        .set(b_epoch.curr as i64);
898
899                    // ====== transition to next state ======
900
901                    *inner_state = *expect_next_state;
902                }
903            }
904        }
905    }
906
907    /// Stream that yields rows to be deleted from main table.
908    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
909    /// Yields `None` when finished processing all vnodes.
910    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
911    async fn make_mergesort_stream<'a>(
912        main_table: &'a StateTableInner<S, SD>,
913        staging_table: &'a StateTableInner<S, SD>,
914        progress_table: &'a mut RefreshProgressTable<S>,
915    ) {
916        for vnode in main_table.vnodes().clone().iter_vnodes() {
917            let mut processed_rows = 0;
918            // Check if this VNode has already been completed (for fault tolerance)
919            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
920                if let Some(current_entry) = progress_table.get_progress(vnode) {
921                    // Skip already completed VNodes during recovery
922                    if current_entry.is_completed {
923                        tracing::debug!(
924                            vnode = vnode.to_index(),
925                            "Skipping already completed VNode during recovery"
926                        );
927                        continue;
928                    }
929                    processed_rows += current_entry.processed_rows;
930                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
931
932                    if let Some(current_state) = &current_entry.current_pos {
933                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
934                    } else {
935                        (Bound::Unbounded, Bound::Unbounded)
936                    }
937                } else {
938                    (Bound::Unbounded, Bound::Unbounded)
939                };
940
941            let iter_main = main_table
942                .iter_keyed_row_with_vnode(
943                    vnode,
944                    &pk_range,
945                    PrefetchOptions::prefetch_for_large_range_scan(),
946                )
947                .await?;
948            let iter_staging = staging_table
949                .iter_keyed_row_with_vnode(
950                    vnode,
951                    &pk_range,
952                    PrefetchOptions::prefetch_for_large_range_scan(),
953                )
954                .await?;
955
956            pin_mut!(iter_main);
957            pin_mut!(iter_staging);
958
959            // Sort-merge join implementation using dual pointers
960            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
961            let mut staging_item: Option<KeyedRow<Bytes>> =
962                iter_staging.next().await.transpose()?;
963
964            while let Some(main_kv) = main_item {
965                let main_key = main_kv.key();
966
967                // Advance staging iterator until we find a key >= main_key
968                let mut should_delete = false;
969                while let Some(staging_kv) = &staging_item {
970                    let staging_key = staging_kv.key();
971                    match main_key.cmp(staging_key) {
972                        std::cmp::Ordering::Greater => {
973                            // main_key > staging_key, advance staging
974                            staging_item = iter_staging.next().await.transpose()?;
975                        }
976                        std::cmp::Ordering::Equal => {
977                            // Keys match, this row exists in both tables, no need to delete
978                            break;
979                        }
980                        std::cmp::Ordering::Less => {
981                            // main_key < staging_key, main row doesn't exist in staging, delete it
982                            should_delete = true;
983                            break;
984                        }
985                    }
986                }
987
988                // If staging_item is None, all remaining main rows should be deleted
989                if staging_item.is_none() {
990                    should_delete = true;
991                }
992
993                if should_delete {
994                    yield Some((vnode, main_kv.row().clone()));
995                }
996
997                // Advance main iterator
998                processed_rows += 1;
999                tracing::debug!(
1000                    "set progress table: vnode = {:?}, processed_rows = {:?}",
1001                    vnode,
1002                    processed_rows
1003                );
1004                progress_table.set_progress(
1005                    vnode,
1006                    Some(
1007                        main_kv
1008                            .row()
1009                            .project(main_table.pk_indices())
1010                            .to_owned_row(),
1011                    ),
1012                    false,
1013                    processed_rows,
1014                )?;
1015                main_item = iter_main.next().await.transpose()?;
1016            }
1017
1018            // Mark this VNode as completed
1019            if let Some(current_entry) = progress_table.get_progress(vnode) {
1020                progress_table.set_progress(
1021                    vnode,
1022                    current_entry.current_pos.clone(),
1023                    true, // completed
1024                    current_entry.processed_rows,
1025                )?;
1026
1027                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1028            }
1029        }
1030
1031        // Signal completion
1032        yield None;
1033    }
1034
1035    /// return true when changed
1036    fn may_update_depended_subscriptions(
1037        depended_subscriptions: &mut HashSet<SubscriberId>,
1038        barrier: &Barrier,
1039        mv_table_id: TableId,
1040    ) {
1041        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1042            if !depended_subscriptions.insert(subscriber_id) {
1043                warn!(
1044                    ?depended_subscriptions,
1045                    %mv_table_id,
1046                    %subscriber_id,
1047                    "subscription id already exists"
1048                );
1049            }
1050        }
1051
1052        if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1053            for SubscriptionUpstreamInfo {
1054                subscriber_id,
1055                upstream_mv_table_id,
1056            } in subscriptions_to_drop
1057            {
1058                if *upstream_mv_table_id == mv_table_id
1059                    && !depended_subscriptions.remove(subscriber_id)
1060                {
1061                    warn!(
1062                        ?depended_subscriptions,
1063                        %mv_table_id,
1064                        %subscriber_id,
1065                        "drop non existing subscriber_id id"
1066                    );
1067                }
1068            }
1069        }
1070    }
1071
1072    /// Initialize refresh progress tracking for all `VNodes`
1073    fn init_refresh_progress(
1074        state_table: &StateTableInner<S, SD>,
1075        progress_table: &mut RefreshProgressTable<S>,
1076        _epoch: u64,
1077    ) -> StreamExecutorResult<()> {
1078        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1079
1080        // Initialize progress for all VNodes in the current bitmap
1081        for vnode in state_table.vnodes().iter_vnodes() {
1082            progress_table.set_progress(
1083                vnode, None,  // initial position
1084                false, // not completed yet
1085                0,     // initial processed rows
1086            )?;
1087        }
1088
1089        tracing::info!(
1090            vnodes_count = state_table.vnodes().count_ones(),
1091            "Initialized refresh progress tracking for all VNodes"
1092        );
1093
1094        Ok(())
1095    }
1096}
1097
1098impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1099    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1100    #[cfg(any(test, feature = "test"))]
1101    pub async fn for_test(
1102        input: Executor,
1103        store: S,
1104        table_id: TableId,
1105        keys: Vec<ColumnOrder>,
1106        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1107        watermark_epoch: AtomicU64Ref,
1108        conflict_behavior: ConflictBehavior,
1109    ) -> Self {
1110        Self::for_test_inner(
1111            input,
1112            store,
1113            table_id,
1114            keys,
1115            column_ids,
1116            watermark_epoch,
1117            conflict_behavior,
1118            None,
1119        )
1120        .await
1121    }
1122
1123    #[cfg(any(test, feature = "test"))]
1124    #[allow(clippy::too_many_arguments)]
1125    pub async fn for_test_with_stream_key(
1126        input: Executor,
1127        store: S,
1128        table_id: TableId,
1129        keys: Vec<ColumnOrder>,
1130        stream_key: Vec<usize>,
1131        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1132        watermark_epoch: AtomicU64Ref,
1133        conflict_behavior: ConflictBehavior,
1134    ) -> Self {
1135        Self::for_test_inner(
1136            input,
1137            store,
1138            table_id,
1139            keys,
1140            column_ids,
1141            watermark_epoch,
1142            conflict_behavior,
1143            Some(stream_key),
1144        )
1145        .await
1146    }
1147
1148    #[cfg(any(test, feature = "test"))]
1149    #[allow(clippy::too_many_arguments)]
1150    async fn for_test_inner(
1151        input: Executor,
1152        store: S,
1153        table_id: TableId,
1154        keys: Vec<ColumnOrder>,
1155        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1156        watermark_epoch: AtomicU64Ref,
1157        conflict_behavior: ConflictBehavior,
1158        stream_key: Option<Vec<usize>>,
1159    ) -> Self {
1160        use risingwave_common::util::iter_util::ZipEqFast;
1161
1162        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1163        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1164        let schema = input.schema().clone();
1165        let columns: Vec<ColumnDesc> = column_ids
1166            .into_iter()
1167            .zip_eq_fast(schema.fields.iter())
1168            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1169            .collect_vec();
1170
1171        let row_serde = BasicSerde::new(
1172            Arc::from((0..columns.len()).collect_vec()),
1173            Arc::from(columns.clone().into_boxed_slice()),
1174        );
1175        let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1176        let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1177            table_id,
1178            columns,
1179            arrange_order_types,
1180            arrange_columns.clone(),
1181            0,
1182        );
1183        table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1184        let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1185
1186        let unused = StreamingMetrics::unused();
1187        let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1188        let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1189
1190        Self {
1191            input,
1192            schema,
1193            state_table,
1194            stream_key_indices,
1195            arrange_key_indices: arrange_columns.clone(),
1196            actor_context: ActorContext::for_test(0),
1197            materialize_cache: MaterializeCache::new(
1198                watermark_epoch,
1199                MetricsInfo::for_test(),
1200                row_serde,
1201                vec![],
1202                conflict_behavior,
1203                None,
1204                cache_metrics,
1205            ),
1206            conflict_behavior,
1207            cleaned_by_ttl_watermark: false,
1208            version_column_indices: vec![],
1209            is_dummy_table: false,
1210            may_have_downstream: true,
1211            subscriber_ids: HashSet::new(),
1212            metrics,
1213            refresh_args: None, // Test constructor doesn't support refresh functionality
1214            local_barrier_manager: LocalBarrierManager::for_test(),
1215        }
1216    }
1217}
1218
1219impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1220    fn execute(self: Box<Self>) -> BoxedMessageStream {
1221        self.execute_inner().boxed()
1222    }
1223}
1224
1225impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1227        f.debug_struct("MaterializeExecutor")
1228            .field("arrange_key_indices", &self.arrange_key_indices)
1229            .field("stream_key_indices", &self.stream_key_indices)
1230            .finish()
1231    }
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236
1237    use std::iter;
1238    use std::sync::atomic::AtomicU64;
1239
1240    use rand::rngs::SmallRng;
1241    use rand::{Rng, RngCore, SeedableRng};
1242    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1243    use risingwave_common::catalog::Field;
1244    use risingwave_common::util::epoch::test_epoch;
1245    use risingwave_common::util::sort_util::OrderType;
1246    use risingwave_hummock_sdk::HummockReadEpoch;
1247    use risingwave_storage::memory::MemoryStateStore;
1248    use risingwave_storage::table::batch_table::BatchTable;
1249
1250    use super::*;
1251    use crate::executor::test_utils::*;
1252
1253    #[tokio::test]
1254    async fn test_materialize_executor() {
1255        // Prepare storage and memtable.
1256        let memory_state_store = MemoryStateStore::new();
1257        let table_id = TableId::new(1);
1258        // Two columns of int32 type, the first column is PK.
1259        let schema = Schema::new(vec![
1260            Field::unnamed(DataType::Int32),
1261            Field::unnamed(DataType::Int32),
1262        ]);
1263        let column_ids = vec![0.into(), 1.into()];
1264
1265        // Prepare source chunks.
1266        let chunk1 = StreamChunk::from_pretty(
1267            " i i
1268            + 1 4
1269            + 2 5
1270            + 3 6",
1271        );
1272        let chunk2 = StreamChunk::from_pretty(
1273            " i i
1274            + 7 8
1275            - 3 6",
1276        );
1277
1278        // Prepare stream executors.
1279        let source = MockSource::with_messages(vec![
1280            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1281            Message::Chunk(chunk1),
1282            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1283            Message::Chunk(chunk2),
1284            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1285        ])
1286        .into_executor(schema.clone(), StreamKey::new());
1287
1288        let order_types = vec![OrderType::ascending()];
1289        let column_descs = vec![
1290            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1291            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1292        ];
1293
1294        let table = BatchTable::for_test(
1295            memory_state_store.clone(),
1296            table_id,
1297            column_descs,
1298            order_types,
1299            vec![0],
1300            vec![0, 1],
1301        );
1302
1303        let mut materialize_executor = MaterializeExecutor::for_test(
1304            source,
1305            memory_state_store,
1306            table_id,
1307            vec![ColumnOrder::new(0, OrderType::ascending())],
1308            column_ids,
1309            Arc::new(AtomicU64::new(0)),
1310            ConflictBehavior::NoCheck,
1311        )
1312        .await
1313        .boxed()
1314        .execute();
1315        materialize_executor.next().await.transpose().unwrap();
1316
1317        materialize_executor.next().await.transpose().unwrap();
1318
1319        // First stream chunk. We check the existence of (3) -> (3,6)
1320        match materialize_executor.next().await.transpose().unwrap() {
1321            Some(Message::Barrier(_)) => {
1322                let row = table
1323                    .get_row(
1324                        &OwnedRow::new(vec![Some(3_i32.into())]),
1325                        HummockReadEpoch::NoWait(u64::MAX),
1326                    )
1327                    .await
1328                    .unwrap();
1329                assert_eq!(
1330                    row,
1331                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1332                );
1333            }
1334            _ => unreachable!(),
1335        }
1336        materialize_executor.next().await.transpose().unwrap();
1337        // Second stream chunk. We check the existence of (7) -> (7,8)
1338        match materialize_executor.next().await.transpose().unwrap() {
1339            Some(Message::Barrier(_)) => {
1340                let row = table
1341                    .get_row(
1342                        &OwnedRow::new(vec![Some(7_i32.into())]),
1343                        HummockReadEpoch::NoWait(u64::MAX),
1344                    )
1345                    .await
1346                    .unwrap();
1347                assert_eq!(
1348                    row,
1349                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1350                );
1351            }
1352            _ => unreachable!(),
1353        }
1354    }
1355
1356    // https://github.com/risingwavelabs/risingwave/issues/13346
1357    #[tokio::test]
1358    async fn test_upsert_stream() {
1359        // Prepare storage and memtable.
1360        let memory_state_store = MemoryStateStore::new();
1361        let table_id = TableId::new(1);
1362        // Two columns of int32 type, the first column is PK.
1363        let schema = Schema::new(vec![
1364            Field::unnamed(DataType::Int32),
1365            Field::unnamed(DataType::Int32),
1366        ]);
1367        let column_ids = vec![0.into(), 1.into()];
1368
1369        // test double insert one pk, the latter needs to override the former.
1370        let chunk1 = StreamChunk::from_pretty(
1371            " i i
1372            + 1 1",
1373        );
1374
1375        let chunk2 = StreamChunk::from_pretty(
1376            " i i
1377            + 1 2
1378            - 1 2",
1379        );
1380
1381        // Prepare stream executors.
1382        let source = MockSource::with_messages(vec![
1383            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1384            Message::Chunk(chunk1),
1385            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1386            Message::Chunk(chunk2),
1387            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1388        ])
1389        .into_executor(schema.clone(), StreamKey::new());
1390
1391        let order_types = vec![OrderType::ascending()];
1392        let column_descs = vec![
1393            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1394            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1395        ];
1396
1397        let table = BatchTable::for_test(
1398            memory_state_store.clone(),
1399            table_id,
1400            column_descs,
1401            order_types,
1402            vec![0],
1403            vec![0, 1],
1404        );
1405
1406        let mut materialize_executor = MaterializeExecutor::for_test(
1407            source,
1408            memory_state_store,
1409            table_id,
1410            vec![ColumnOrder::new(0, OrderType::ascending())],
1411            column_ids,
1412            Arc::new(AtomicU64::new(0)),
1413            ConflictBehavior::Overwrite,
1414        )
1415        .await
1416        .boxed()
1417        .execute();
1418        materialize_executor.next().await.transpose().unwrap();
1419
1420        materialize_executor.next().await.transpose().unwrap();
1421        materialize_executor.next().await.transpose().unwrap();
1422        materialize_executor.next().await.transpose().unwrap();
1423
1424        match materialize_executor.next().await.transpose().unwrap() {
1425            Some(Message::Barrier(_)) => {
1426                let row = table
1427                    .get_row(
1428                        &OwnedRow::new(vec![Some(1_i32.into())]),
1429                        HummockReadEpoch::NoWait(u64::MAX),
1430                    )
1431                    .await
1432                    .unwrap();
1433                assert!(row.is_none());
1434            }
1435            _ => unreachable!(),
1436        }
1437    }
1438
1439    #[tokio::test]
1440    async fn test_check_insert_conflict() {
1441        // Prepare storage and memtable.
1442        let memory_state_store = MemoryStateStore::new();
1443        let table_id = TableId::new(1);
1444        // Two columns of int32 type, the first column is PK.
1445        let schema = Schema::new(vec![
1446            Field::unnamed(DataType::Int32),
1447            Field::unnamed(DataType::Int32),
1448        ]);
1449        let column_ids = vec![0.into(), 1.into()];
1450
1451        // test double insert one pk, the latter needs to override the former.
1452        let chunk1 = StreamChunk::from_pretty(
1453            " i i
1454            + 1 3
1455            + 1 4
1456            + 2 5
1457            + 3 6",
1458        );
1459
1460        let chunk2 = StreamChunk::from_pretty(
1461            " i i
1462            + 1 3
1463            + 2 6",
1464        );
1465
1466        // test delete wrong value, delete inexistent pk
1467        let chunk3 = StreamChunk::from_pretty(
1468            " i i
1469            + 1 4",
1470        );
1471
1472        // Prepare stream executors.
1473        let source = MockSource::with_messages(vec![
1474            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1475            Message::Chunk(chunk1),
1476            Message::Chunk(chunk2),
1477            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1478            Message::Chunk(chunk3),
1479            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1480        ])
1481        .into_executor(schema.clone(), StreamKey::new());
1482
1483        let order_types = vec![OrderType::ascending()];
1484        let column_descs = vec![
1485            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1486            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1487        ];
1488
1489        let table = BatchTable::for_test(
1490            memory_state_store.clone(),
1491            table_id,
1492            column_descs,
1493            order_types,
1494            vec![0],
1495            vec![0, 1],
1496        );
1497
1498        let mut materialize_executor = MaterializeExecutor::for_test(
1499            source,
1500            memory_state_store,
1501            table_id,
1502            vec![ColumnOrder::new(0, OrderType::ascending())],
1503            column_ids,
1504            Arc::new(AtomicU64::new(0)),
1505            ConflictBehavior::Overwrite,
1506        )
1507        .await
1508        .boxed()
1509        .execute();
1510        materialize_executor.next().await.transpose().unwrap();
1511
1512        materialize_executor.next().await.transpose().unwrap();
1513        materialize_executor.next().await.transpose().unwrap();
1514
1515        // First stream chunk. We check the existence of (3) -> (3,6)
1516        match materialize_executor.next().await.transpose().unwrap() {
1517            Some(Message::Barrier(_)) => {
1518                let row = table
1519                    .get_row(
1520                        &OwnedRow::new(vec![Some(3_i32.into())]),
1521                        HummockReadEpoch::NoWait(u64::MAX),
1522                    )
1523                    .await
1524                    .unwrap();
1525                assert_eq!(
1526                    row,
1527                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1528                );
1529
1530                let row = table
1531                    .get_row(
1532                        &OwnedRow::new(vec![Some(1_i32.into())]),
1533                        HummockReadEpoch::NoWait(u64::MAX),
1534                    )
1535                    .await
1536                    .unwrap();
1537                assert_eq!(
1538                    row,
1539                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1540                );
1541
1542                let row = table
1543                    .get_row(
1544                        &OwnedRow::new(vec![Some(2_i32.into())]),
1545                        HummockReadEpoch::NoWait(u64::MAX),
1546                    )
1547                    .await
1548                    .unwrap();
1549                assert_eq!(
1550                    row,
1551                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1552                );
1553            }
1554            _ => unreachable!(),
1555        }
1556    }
1557
1558    #[tokio::test]
1559    async fn test_delete_and_update_conflict() {
1560        // Prepare storage and memtable.
1561        let memory_state_store = MemoryStateStore::new();
1562        let table_id = TableId::new(1);
1563        // Two columns of int32 type, the first column is PK.
1564        let schema = Schema::new(vec![
1565            Field::unnamed(DataType::Int32),
1566            Field::unnamed(DataType::Int32),
1567        ]);
1568        let column_ids = vec![0.into(), 1.into()];
1569
1570        // test double insert one pk, the latter needs to override the former.
1571        let chunk1 = StreamChunk::from_pretty(
1572            " i i
1573            + 1 4
1574            + 2 5
1575            + 3 6
1576            U- 8 1
1577            U+ 8 2
1578            + 8 3",
1579        );
1580
1581        // test delete wrong value, delete inexistent pk
1582        let chunk2 = StreamChunk::from_pretty(
1583            " i i
1584            + 7 8
1585            - 3 4
1586            - 5 0",
1587        );
1588
1589        // test delete wrong value, delete inexistent pk
1590        let chunk3 = StreamChunk::from_pretty(
1591            " i i
1592            + 1 5
1593            U- 2 4
1594            U+ 2 8
1595            U- 9 0
1596            U+ 9 1",
1597        );
1598
1599        // Prepare stream executors.
1600        let source = MockSource::with_messages(vec![
1601            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1602            Message::Chunk(chunk1),
1603            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1604            Message::Chunk(chunk2),
1605            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1606            Message::Chunk(chunk3),
1607            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1608        ])
1609        .into_executor(schema.clone(), StreamKey::new());
1610
1611        let order_types = vec![OrderType::ascending()];
1612        let column_descs = vec![
1613            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1614            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1615        ];
1616
1617        let table = BatchTable::for_test(
1618            memory_state_store.clone(),
1619            table_id,
1620            column_descs,
1621            order_types,
1622            vec![0],
1623            vec![0, 1],
1624        );
1625
1626        let mut materialize_executor = MaterializeExecutor::for_test(
1627            source,
1628            memory_state_store,
1629            table_id,
1630            vec![ColumnOrder::new(0, OrderType::ascending())],
1631            column_ids,
1632            Arc::new(AtomicU64::new(0)),
1633            ConflictBehavior::Overwrite,
1634        )
1635        .await
1636        .boxed()
1637        .execute();
1638        materialize_executor.next().await.transpose().unwrap();
1639
1640        materialize_executor.next().await.transpose().unwrap();
1641
1642        // First stream chunk. We check the existence of (3) -> (3,6)
1643        match materialize_executor.next().await.transpose().unwrap() {
1644            Some(Message::Barrier(_)) => {
1645                // can read (8, 3), check insert after update
1646                let row = table
1647                    .get_row(
1648                        &OwnedRow::new(vec![Some(8_i32.into())]),
1649                        HummockReadEpoch::NoWait(u64::MAX),
1650                    )
1651                    .await
1652                    .unwrap();
1653                assert_eq!(
1654                    row,
1655                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1656                );
1657            }
1658            _ => unreachable!(),
1659        }
1660        materialize_executor.next().await.transpose().unwrap();
1661
1662        match materialize_executor.next().await.transpose().unwrap() {
1663            Some(Message::Barrier(_)) => {
1664                let row = table
1665                    .get_row(
1666                        &OwnedRow::new(vec![Some(7_i32.into())]),
1667                        HummockReadEpoch::NoWait(u64::MAX),
1668                    )
1669                    .await
1670                    .unwrap();
1671                assert_eq!(
1672                    row,
1673                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1674                );
1675
1676                // check delete wrong value
1677                let row = table
1678                    .get_row(
1679                        &OwnedRow::new(vec![Some(3_i32.into())]),
1680                        HummockReadEpoch::NoWait(u64::MAX),
1681                    )
1682                    .await
1683                    .unwrap();
1684                assert_eq!(row, None);
1685
1686                // check delete wrong pk
1687                let row = table
1688                    .get_row(
1689                        &OwnedRow::new(vec![Some(5_i32.into())]),
1690                        HummockReadEpoch::NoWait(u64::MAX),
1691                    )
1692                    .await
1693                    .unwrap();
1694                assert_eq!(row, None);
1695            }
1696            _ => unreachable!(),
1697        }
1698
1699        materialize_executor.next().await.transpose().unwrap();
1700        // Second stream chunk. We check the existence of (7) -> (7,8)
1701        match materialize_executor.next().await.transpose().unwrap() {
1702            Some(Message::Barrier(_)) => {
1703                let row = table
1704                    .get_row(
1705                        &OwnedRow::new(vec![Some(1_i32.into())]),
1706                        HummockReadEpoch::NoWait(u64::MAX),
1707                    )
1708                    .await
1709                    .unwrap();
1710                assert_eq!(
1711                    row,
1712                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1713                );
1714
1715                // check update wrong value
1716                let row = table
1717                    .get_row(
1718                        &OwnedRow::new(vec![Some(2_i32.into())]),
1719                        HummockReadEpoch::NoWait(u64::MAX),
1720                    )
1721                    .await
1722                    .unwrap();
1723                assert_eq!(
1724                    row,
1725                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1726                );
1727
1728                // check update wrong pk, should become insert
1729                let row = table
1730                    .get_row(
1731                        &OwnedRow::new(vec![Some(9_i32.into())]),
1732                        HummockReadEpoch::NoWait(u64::MAX),
1733                    )
1734                    .await
1735                    .unwrap();
1736                assert_eq!(
1737                    row,
1738                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1739                );
1740            }
1741            _ => unreachable!(),
1742        }
1743    }
1744
1745    #[tokio::test]
1746    async fn test_change_buffer_into_chunk_with_stream_key() {
1747        // Table PK is (col0), but stream key is (col0, col1). When the value column changes due
1748        // to overwrite conflict handling, we should output `- old` + `+ new` rather than `U-`/`U+`.
1749        let memory_state_store = MemoryStateStore::new();
1750        let table_id = TableId::new(1);
1751        let schema = Schema::new(vec![
1752            Field::unnamed(DataType::Int32),
1753            Field::unnamed(DataType::Int32),
1754        ]);
1755        let column_ids = vec![0.into(), 1.into()];
1756
1757        let chunk1 = StreamChunk::from_pretty(
1758            " i i
1759            + 1 4",
1760        );
1761        let chunk2 = StreamChunk::from_pretty(
1762            " i i
1763            + 1 5",
1764        );
1765
1766        let source = MockSource::with_messages(vec![
1767            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1768            Message::Chunk(chunk1),
1769            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1770            Message::Chunk(chunk2),
1771            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1772        ])
1773        .into_executor(schema.clone(), StreamKey::new());
1774
1775        let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1776            source,
1777            memory_state_store,
1778            table_id,
1779            vec![ColumnOrder::new(0, OrderType::ascending())],
1780            vec![0, 1],
1781            column_ids,
1782            Arc::new(AtomicU64::new(0)),
1783            ConflictBehavior::Overwrite,
1784        )
1785        .await
1786        .boxed()
1787        .execute();
1788
1789        // init barrier + first insert
1790        materialize_executor.next().await.transpose().unwrap();
1791        materialize_executor.next().await.transpose().unwrap();
1792
1793        // commit barrier
1794        materialize_executor.next().await.transpose().unwrap();
1795
1796        // overwrite conflict should be converted into delete+insert due to stream key mismatch
1797        match materialize_executor.next().await.transpose().unwrap() {
1798            Some(Message::Chunk(chunk)) => {
1799                assert_eq!(
1800                    chunk.compact_vis(),
1801                    StreamChunk::from_pretty(
1802                        " i i
1803                        - 1 4
1804                        + 1 5"
1805                    )
1806                );
1807            }
1808            other => panic!("expect chunk, got {other:?}"),
1809        }
1810    }
1811
1812    #[tokio::test]
1813    async fn test_ignore_insert_conflict() {
1814        // Prepare storage and memtable.
1815        let memory_state_store = MemoryStateStore::new();
1816        let table_id = TableId::new(1);
1817        // Two columns of int32 type, the first column is PK.
1818        let schema = Schema::new(vec![
1819            Field::unnamed(DataType::Int32),
1820            Field::unnamed(DataType::Int32),
1821        ]);
1822        let column_ids = vec![0.into(), 1.into()];
1823
1824        // test double insert one pk, the latter needs to be ignored.
1825        let chunk1 = StreamChunk::from_pretty(
1826            " i i
1827            + 1 3
1828            + 1 4
1829            + 2 5
1830            + 3 6",
1831        );
1832
1833        let chunk2 = StreamChunk::from_pretty(
1834            " i i
1835            + 1 5
1836            + 2 6",
1837        );
1838
1839        // test delete wrong value, delete inexistent pk
1840        let chunk3 = StreamChunk::from_pretty(
1841            " i i
1842            + 1 6",
1843        );
1844
1845        // Prepare stream executors.
1846        let source = MockSource::with_messages(vec![
1847            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1848            Message::Chunk(chunk1),
1849            Message::Chunk(chunk2),
1850            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1851            Message::Chunk(chunk3),
1852            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1853        ])
1854        .into_executor(schema.clone(), StreamKey::new());
1855
1856        let order_types = vec![OrderType::ascending()];
1857        let column_descs = vec![
1858            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1859            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1860        ];
1861
1862        let table = BatchTable::for_test(
1863            memory_state_store.clone(),
1864            table_id,
1865            column_descs,
1866            order_types,
1867            vec![0],
1868            vec![0, 1],
1869        );
1870
1871        let mut materialize_executor = MaterializeExecutor::for_test(
1872            source,
1873            memory_state_store,
1874            table_id,
1875            vec![ColumnOrder::new(0, OrderType::ascending())],
1876            column_ids,
1877            Arc::new(AtomicU64::new(0)),
1878            ConflictBehavior::IgnoreConflict,
1879        )
1880        .await
1881        .boxed()
1882        .execute();
1883        materialize_executor.next().await.transpose().unwrap();
1884
1885        materialize_executor.next().await.transpose().unwrap();
1886        materialize_executor.next().await.transpose().unwrap();
1887
1888        // First stream chunk. We check the existence of (3) -> (3,6)
1889        match materialize_executor.next().await.transpose().unwrap() {
1890            Some(Message::Barrier(_)) => {
1891                let row = table
1892                    .get_row(
1893                        &OwnedRow::new(vec![Some(3_i32.into())]),
1894                        HummockReadEpoch::NoWait(u64::MAX),
1895                    )
1896                    .await
1897                    .unwrap();
1898                assert_eq!(
1899                    row,
1900                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1901                );
1902
1903                let row = table
1904                    .get_row(
1905                        &OwnedRow::new(vec![Some(1_i32.into())]),
1906                        HummockReadEpoch::NoWait(u64::MAX),
1907                    )
1908                    .await
1909                    .unwrap();
1910                assert_eq!(
1911                    row,
1912                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1913                );
1914
1915                let row = table
1916                    .get_row(
1917                        &OwnedRow::new(vec![Some(2_i32.into())]),
1918                        HummockReadEpoch::NoWait(u64::MAX),
1919                    )
1920                    .await
1921                    .unwrap();
1922                assert_eq!(
1923                    row,
1924                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1925                );
1926            }
1927            _ => unreachable!(),
1928        }
1929    }
1930
1931    #[tokio::test]
1932    async fn test_ignore_delete_then_insert() {
1933        // Prepare storage and memtable.
1934        let memory_state_store = MemoryStateStore::new();
1935        let table_id = TableId::new(1);
1936        // Two columns of int32 type, the first column is PK.
1937        let schema = Schema::new(vec![
1938            Field::unnamed(DataType::Int32),
1939            Field::unnamed(DataType::Int32),
1940        ]);
1941        let column_ids = vec![0.into(), 1.into()];
1942
1943        // test insert after delete one pk, the latter insert should succeed.
1944        let chunk1 = StreamChunk::from_pretty(
1945            " i i
1946            + 1 3
1947            - 1 3
1948            + 1 6",
1949        );
1950
1951        // Prepare stream executors.
1952        let source = MockSource::with_messages(vec![
1953            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1954            Message::Chunk(chunk1),
1955            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1956        ])
1957        .into_executor(schema.clone(), StreamKey::new());
1958
1959        let order_types = vec![OrderType::ascending()];
1960        let column_descs = vec![
1961            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1962            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1963        ];
1964
1965        let table = BatchTable::for_test(
1966            memory_state_store.clone(),
1967            table_id,
1968            column_descs,
1969            order_types,
1970            vec![0],
1971            vec![0, 1],
1972        );
1973
1974        let mut materialize_executor = MaterializeExecutor::for_test(
1975            source,
1976            memory_state_store,
1977            table_id,
1978            vec![ColumnOrder::new(0, OrderType::ascending())],
1979            column_ids,
1980            Arc::new(AtomicU64::new(0)),
1981            ConflictBehavior::IgnoreConflict,
1982        )
1983        .await
1984        .boxed()
1985        .execute();
1986        let _msg1 = materialize_executor
1987            .next()
1988            .await
1989            .transpose()
1990            .unwrap()
1991            .unwrap()
1992            .as_barrier()
1993            .unwrap();
1994        let _msg2 = materialize_executor
1995            .next()
1996            .await
1997            .transpose()
1998            .unwrap()
1999            .unwrap()
2000            .as_chunk()
2001            .unwrap();
2002        let _msg3 = materialize_executor
2003            .next()
2004            .await
2005            .transpose()
2006            .unwrap()
2007            .unwrap()
2008            .as_barrier()
2009            .unwrap();
2010
2011        let row = table
2012            .get_row(
2013                &OwnedRow::new(vec![Some(1_i32.into())]),
2014                HummockReadEpoch::NoWait(u64::MAX),
2015            )
2016            .await
2017            .unwrap();
2018        assert_eq!(
2019            row,
2020            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2021        );
2022    }
2023
2024    #[tokio::test]
2025    async fn test_ignore_delete_and_update_conflict() {
2026        // Prepare storage and memtable.
2027        let memory_state_store = MemoryStateStore::new();
2028        let table_id = TableId::new(1);
2029        // Two columns of int32 type, the first column is PK.
2030        let schema = Schema::new(vec![
2031            Field::unnamed(DataType::Int32),
2032            Field::unnamed(DataType::Int32),
2033        ]);
2034        let column_ids = vec![0.into(), 1.into()];
2035
2036        // test double insert one pk, the latter should be ignored.
2037        let chunk1 = StreamChunk::from_pretty(
2038            " i i
2039            + 1 4
2040            + 2 5
2041            + 3 6
2042            U- 8 1
2043            U+ 8 2
2044            + 8 3",
2045        );
2046
2047        // test delete wrong value, delete inexistent pk
2048        let chunk2 = StreamChunk::from_pretty(
2049            " i i
2050            + 7 8
2051            - 3 4
2052            - 5 0",
2053        );
2054
2055        // test delete wrong value, delete inexistent pk
2056        let chunk3 = StreamChunk::from_pretty(
2057            " i i
2058            + 1 5
2059            U- 2 4
2060            U+ 2 8
2061            U- 9 0
2062            U+ 9 1",
2063        );
2064
2065        // Prepare stream executors.
2066        let source = MockSource::with_messages(vec![
2067            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2068            Message::Chunk(chunk1),
2069            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2070            Message::Chunk(chunk2),
2071            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2072            Message::Chunk(chunk3),
2073            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2074        ])
2075        .into_executor(schema.clone(), StreamKey::new());
2076
2077        let order_types = vec![OrderType::ascending()];
2078        let column_descs = vec![
2079            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2080            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2081        ];
2082
2083        let table = BatchTable::for_test(
2084            memory_state_store.clone(),
2085            table_id,
2086            column_descs,
2087            order_types,
2088            vec![0],
2089            vec![0, 1],
2090        );
2091
2092        let mut materialize_executor = MaterializeExecutor::for_test(
2093            source,
2094            memory_state_store,
2095            table_id,
2096            vec![ColumnOrder::new(0, OrderType::ascending())],
2097            column_ids,
2098            Arc::new(AtomicU64::new(0)),
2099            ConflictBehavior::IgnoreConflict,
2100        )
2101        .await
2102        .boxed()
2103        .execute();
2104        materialize_executor.next().await.transpose().unwrap();
2105
2106        materialize_executor.next().await.transpose().unwrap();
2107
2108        // First stream chunk. We check the existence of (3) -> (3,6)
2109        match materialize_executor.next().await.transpose().unwrap() {
2110            Some(Message::Barrier(_)) => {
2111                // can read (8, 2), check insert after update
2112                let row = table
2113                    .get_row(
2114                        &OwnedRow::new(vec![Some(8_i32.into())]),
2115                        HummockReadEpoch::NoWait(u64::MAX),
2116                    )
2117                    .await
2118                    .unwrap();
2119                assert_eq!(
2120                    row,
2121                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2122                );
2123            }
2124            _ => unreachable!(),
2125        }
2126        materialize_executor.next().await.transpose().unwrap();
2127
2128        match materialize_executor.next().await.transpose().unwrap() {
2129            Some(Message::Barrier(_)) => {
2130                let row = table
2131                    .get_row(
2132                        &OwnedRow::new(vec![Some(7_i32.into())]),
2133                        HummockReadEpoch::NoWait(u64::MAX),
2134                    )
2135                    .await
2136                    .unwrap();
2137                assert_eq!(
2138                    row,
2139                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2140                );
2141
2142                // check delete wrong value
2143                let row = table
2144                    .get_row(
2145                        &OwnedRow::new(vec![Some(3_i32.into())]),
2146                        HummockReadEpoch::NoWait(u64::MAX),
2147                    )
2148                    .await
2149                    .unwrap();
2150                assert_eq!(row, None);
2151
2152                // check delete wrong pk
2153                let row = table
2154                    .get_row(
2155                        &OwnedRow::new(vec![Some(5_i32.into())]),
2156                        HummockReadEpoch::NoWait(u64::MAX),
2157                    )
2158                    .await
2159                    .unwrap();
2160                assert_eq!(row, None);
2161            }
2162            _ => unreachable!(),
2163        }
2164
2165        materialize_executor.next().await.transpose().unwrap();
2166        // materialize_executor.next().await.transpose().unwrap();
2167        // Second stream chunk. We check the existence of (7) -> (7,8)
2168        match materialize_executor.next().await.transpose().unwrap() {
2169            Some(Message::Barrier(_)) => {
2170                let row = table
2171                    .get_row(
2172                        &OwnedRow::new(vec![Some(1_i32.into())]),
2173                        HummockReadEpoch::NoWait(u64::MAX),
2174                    )
2175                    .await
2176                    .unwrap();
2177                assert_eq!(
2178                    row,
2179                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2180                );
2181
2182                // check update wrong value
2183                let row = table
2184                    .get_row(
2185                        &OwnedRow::new(vec![Some(2_i32.into())]),
2186                        HummockReadEpoch::NoWait(u64::MAX),
2187                    )
2188                    .await
2189                    .unwrap();
2190                assert_eq!(
2191                    row,
2192                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2193                );
2194
2195                // check update wrong pk, should become insert
2196                let row = table
2197                    .get_row(
2198                        &OwnedRow::new(vec![Some(9_i32.into())]),
2199                        HummockReadEpoch::NoWait(u64::MAX),
2200                    )
2201                    .await
2202                    .unwrap();
2203                assert_eq!(
2204                    row,
2205                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2206                );
2207            }
2208            _ => unreachable!(),
2209        }
2210    }
2211
2212    #[tokio::test]
2213    async fn test_do_update_if_not_null_conflict() {
2214        // Prepare storage and memtable.
2215        let memory_state_store = MemoryStateStore::new();
2216        let table_id = TableId::new(1);
2217        // Two columns of int32 type, the first column is PK.
2218        let schema = Schema::new(vec![
2219            Field::unnamed(DataType::Int32),
2220            Field::unnamed(DataType::Int32),
2221        ]);
2222        let column_ids = vec![0.into(), 1.into()];
2223
2224        // should get (8, 2)
2225        let chunk1 = StreamChunk::from_pretty(
2226            " i i
2227            + 1 4
2228            + 2 .
2229            + 3 6
2230            U- 8 .
2231            U+ 8 2
2232            + 8 .",
2233        );
2234
2235        // should not get (3, x), should not get (5, 0)
2236        let chunk2 = StreamChunk::from_pretty(
2237            " i i
2238            + 7 8
2239            - 3 4
2240            - 5 0",
2241        );
2242
2243        // should get (2, None), (7, 8)
2244        let chunk3 = StreamChunk::from_pretty(
2245            " i i
2246            + 1 5
2247            + 7 .
2248            U- 2 4
2249            U+ 2 .
2250            U- 9 0
2251            U+ 9 1",
2252        );
2253
2254        // Prepare stream executors.
2255        let source = MockSource::with_messages(vec![
2256            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2257            Message::Chunk(chunk1),
2258            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2259            Message::Chunk(chunk2),
2260            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2261            Message::Chunk(chunk3),
2262            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2263        ])
2264        .into_executor(schema.clone(), StreamKey::new());
2265
2266        let order_types = vec![OrderType::ascending()];
2267        let column_descs = vec![
2268            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2269            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2270        ];
2271
2272        let table = BatchTable::for_test(
2273            memory_state_store.clone(),
2274            table_id,
2275            column_descs,
2276            order_types,
2277            vec![0],
2278            vec![0, 1],
2279        );
2280
2281        let mut materialize_executor = MaterializeExecutor::for_test(
2282            source,
2283            memory_state_store,
2284            table_id,
2285            vec![ColumnOrder::new(0, OrderType::ascending())],
2286            column_ids,
2287            Arc::new(AtomicU64::new(0)),
2288            ConflictBehavior::DoUpdateIfNotNull,
2289        )
2290        .await
2291        .boxed()
2292        .execute();
2293        materialize_executor.next().await.transpose().unwrap();
2294
2295        materialize_executor.next().await.transpose().unwrap();
2296
2297        // First stream chunk. We check the existence of (3) -> (3,6)
2298        match materialize_executor.next().await.transpose().unwrap() {
2299            Some(Message::Barrier(_)) => {
2300                let row = table
2301                    .get_row(
2302                        &OwnedRow::new(vec![Some(8_i32.into())]),
2303                        HummockReadEpoch::NoWait(u64::MAX),
2304                    )
2305                    .await
2306                    .unwrap();
2307                assert_eq!(
2308                    row,
2309                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2310                );
2311
2312                let row = table
2313                    .get_row(
2314                        &OwnedRow::new(vec![Some(2_i32.into())]),
2315                        HummockReadEpoch::NoWait(u64::MAX),
2316                    )
2317                    .await
2318                    .unwrap();
2319                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2320            }
2321            _ => unreachable!(),
2322        }
2323        materialize_executor.next().await.transpose().unwrap();
2324
2325        match materialize_executor.next().await.transpose().unwrap() {
2326            Some(Message::Barrier(_)) => {
2327                let row = table
2328                    .get_row(
2329                        &OwnedRow::new(vec![Some(7_i32.into())]),
2330                        HummockReadEpoch::NoWait(u64::MAX),
2331                    )
2332                    .await
2333                    .unwrap();
2334                assert_eq!(
2335                    row,
2336                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2337                );
2338
2339                // check delete wrong value
2340                let row = table
2341                    .get_row(
2342                        &OwnedRow::new(vec![Some(3_i32.into())]),
2343                        HummockReadEpoch::NoWait(u64::MAX),
2344                    )
2345                    .await
2346                    .unwrap();
2347                assert_eq!(row, None);
2348
2349                // check delete wrong pk
2350                let row = table
2351                    .get_row(
2352                        &OwnedRow::new(vec![Some(5_i32.into())]),
2353                        HummockReadEpoch::NoWait(u64::MAX),
2354                    )
2355                    .await
2356                    .unwrap();
2357                assert_eq!(row, None);
2358            }
2359            _ => unreachable!(),
2360        }
2361
2362        materialize_executor.next().await.transpose().unwrap();
2363        // materialize_executor.next().await.transpose().unwrap();
2364        // Second stream chunk. We check the existence of (7) -> (7,8)
2365        match materialize_executor.next().await.transpose().unwrap() {
2366            Some(Message::Barrier(_)) => {
2367                let row = table
2368                    .get_row(
2369                        &OwnedRow::new(vec![Some(7_i32.into())]),
2370                        HummockReadEpoch::NoWait(u64::MAX),
2371                    )
2372                    .await
2373                    .unwrap();
2374                assert_eq!(
2375                    row,
2376                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2377                );
2378
2379                // check update wrong value
2380                let row = table
2381                    .get_row(
2382                        &OwnedRow::new(vec![Some(2_i32.into())]),
2383                        HummockReadEpoch::NoWait(u64::MAX),
2384                    )
2385                    .await
2386                    .unwrap();
2387                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2388
2389                // check update wrong pk, should become insert
2390                let row = table
2391                    .get_row(
2392                        &OwnedRow::new(vec![Some(9_i32.into())]),
2393                        HummockReadEpoch::NoWait(u64::MAX),
2394                    )
2395                    .await
2396                    .unwrap();
2397                assert_eq!(
2398                    row,
2399                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2400                );
2401            }
2402            _ => unreachable!(),
2403        }
2404    }
2405
2406    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2407        const KN: u32 = 4;
2408        const SEED: u64 = 998244353;
2409        let mut ret = vec![];
2410        let mut builder =
2411            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2412        let mut rng = SmallRng::seed_from_u64(SEED);
2413
2414        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2415            let len = c.data_chunk().capacity();
2416            let mut c = StreamChunkMut::from(c);
2417            for i in 0..len {
2418                c.set_vis(i, rng.random_bool(0.5));
2419            }
2420            c.into()
2421        };
2422        for _ in 0..row_number {
2423            let k = (rng.next_u32() % KN) as i32;
2424            let v = rng.next_u32() as i32;
2425            let op = if rng.random_bool(0.5) {
2426                Op::Insert
2427            } else {
2428                Op::Delete
2429            };
2430            if let Some(c) =
2431                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2432            {
2433                ret.push(random_vis(c, &mut rng));
2434            }
2435        }
2436        if let Some(c) = builder.take() {
2437            ret.push(random_vis(c, &mut rng));
2438        }
2439        ret
2440    }
2441
2442    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2443        const N: usize = 100000;
2444
2445        // Prepare storage and memtable.
2446        let memory_state_store = MemoryStateStore::new();
2447        let table_id = TableId::new(1);
2448        // Two columns of int32 type, the first column is PK.
2449        let schema = Schema::new(vec![
2450            Field::unnamed(DataType::Int32),
2451            Field::unnamed(DataType::Int32),
2452        ]);
2453        let column_ids = vec![0.into(), 1.into()];
2454
2455        let chunks = gen_fuzz_data(N, 128);
2456        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2457            .chain(chunks.into_iter().map(Message::Chunk))
2458            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2459                test_epoch(2),
2460            ))))
2461            .collect();
2462        // Prepare stream executors.
2463        let source =
2464            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2465
2466        let mut materialize_executor = MaterializeExecutor::for_test(
2467            source,
2468            memory_state_store.clone(),
2469            table_id,
2470            vec![ColumnOrder::new(0, OrderType::ascending())],
2471            column_ids,
2472            Arc::new(AtomicU64::new(0)),
2473            conflict_behavior,
2474        )
2475        .await
2476        .boxed()
2477        .execute();
2478        materialize_executor.expect_barrier().await;
2479
2480        let order_types = vec![OrderType::ascending()];
2481        let column_descs = vec![
2482            ColumnDesc::unnamed(0.into(), DataType::Int32),
2483            ColumnDesc::unnamed(1.into(), DataType::Int32),
2484        ];
2485        let pk_indices = vec![0];
2486
2487        let mut table = StateTable::from_table_catalog(
2488            &crate::common::table::test_utils::gen_pbtable(
2489                TableId::from(1002),
2490                column_descs.clone(),
2491                order_types,
2492                pk_indices,
2493                0,
2494            ),
2495            memory_state_store.clone(),
2496            None,
2497        )
2498        .await;
2499
2500        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2501            // check with state table's memtable
2502            table.write_chunk(c);
2503        }
2504    }
2505
2506    #[tokio::test]
2507    async fn fuzz_test_stream_consistent_upsert() {
2508        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2509    }
2510
2511    #[tokio::test]
2512    async fn fuzz_test_stream_consistent_ignore() {
2513        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2514    }
2515}