risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57    NormalIngestion,
58    MergingData,
59    CleanUp,
60    CommitAndYieldBarrier {
61        barrier: BarrierInner<M>,
62        expect_next_state: Box<MaterializeStreamState<M>>,
63    },
64    RefreshEnd {
65        on_complete_epoch: EpochPair,
66    },
67}
68
69/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
70pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71    input: Executor,
72
73    schema: Schema,
74
75    state_table: StateTableInner<S, SD>,
76
77    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
78    arrange_key_indices: Vec<usize>,
79
80    actor_context: ActorContextRef,
81
82    /// The cache for conflict handling. `None` if conflict behavior is `NoCheck`.
83    materialize_cache: Option<MaterializeCache>,
84
85    conflict_behavior: ConflictBehavior,
86
87    version_column_indices: Vec<u32>,
88
89    may_have_downstream: bool,
90
91    subscriber_ids: HashSet<SubscriberId>,
92
93    metrics: MaterializeMetrics,
94
95    /// No data will be written to hummock table. This Materialize is just a dummy node.
96    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
97    is_dummy_table: bool,
98
99    /// Optional refresh arguments and state for refreshable materialized views
100    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
101
102    /// Local barrier manager for reporting barrier events
103    local_barrier_manager: LocalBarrierManager,
104}
105
106/// Arguments and state for refreshable materialized views
107pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
108    /// Table catalog for main table
109    pub table_catalog: Table,
110
111    /// Table catalog for staging table
112    pub staging_table_catalog: Table,
113
114    /// Flag indicating if this table is currently being refreshed
115    pub is_refreshing: bool,
116
117    /// During data refresh (between `RefreshStart` and `LoadFinish`),
118    /// data will be written to both the main table and the staging table.
119    ///
120    /// The staging table is PK-only.
121    ///
122    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
123    pub staging_table: StateTableInner<S, SD>,
124
125    /// Progress table for tracking refresh state per `VNode` for fault tolerance
126    pub progress_table: RefreshProgressTable<S>,
127
128    /// Table ID for this refreshable materialized view
129    pub table_id: TableId,
130}
131
132impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
133    /// Create new `RefreshableMaterializeArgs`
134    pub async fn new(
135        store: S,
136        table_catalog: &Table,
137        staging_table_catalog: &Table,
138        progress_state_table: &Table,
139        vnodes: Option<Arc<Bitmap>>,
140    ) -> Self {
141        let table_id = table_catalog.id;
142
143        // staging table is pk-only, and we don't need to check value consistency
144        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
145            staging_table_catalog,
146            store.clone(),
147            vnodes.clone(),
148        )
149        .await;
150
151        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
152            progress_state_table,
153            store,
154            vnodes,
155        )
156        .await;
157
158        // Get primary key length from main table catalog
159        let pk_len = table_catalog.pk.len();
160        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
161
162        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
163
164        Self {
165            table_catalog: table_catalog.clone(),
166            staging_table_catalog: staging_table_catalog.clone(),
167            is_refreshing: false,
168            staging_table,
169            progress_table,
170            table_id,
171        }
172    }
173}
174
175fn get_op_consistency_level(
176    conflict_behavior: ConflictBehavior,
177    may_have_downstream: bool,
178    subscriber_ids: &HashSet<SubscriberId>,
179) -> StateTableOpConsistencyLevel {
180    if !subscriber_ids.is_empty() {
181        StateTableOpConsistencyLevel::LogStoreEnabled
182    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
183        // Table with overwrite conflict behavior could disable conflict check
184        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
185        StateTableOpConsistencyLevel::Inconsistent
186    } else {
187        StateTableOpConsistencyLevel::ConsistentOldValue
188    }
189}
190
191impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
192    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
193    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
194    /// should be `None`.
195    #[allow(clippy::too_many_arguments)]
196    pub async fn new(
197        input: Executor,
198        schema: Schema,
199        store: S,
200        arrange_key: Vec<ColumnOrder>,
201        actor_context: ActorContextRef,
202        vnodes: Option<Arc<Bitmap>>,
203        table_catalog: &Table,
204        watermark_epoch: AtomicU64Ref,
205        conflict_behavior: ConflictBehavior,
206        version_column_indices: Vec<u32>,
207        metrics: Arc<StreamingMetrics>,
208        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
209        local_barrier_manager: LocalBarrierManager,
210    ) -> Self {
211        let table_columns: Vec<ColumnDesc> = table_catalog
212            .columns
213            .iter()
214            .map(|col| col.column_desc.as_ref().unwrap().into())
215            .collect();
216
217        // Extract TOAST-able column indices from table columns.
218        // Only for PostgreSQL CDC tables.
219        let toastable_column_indices = if table_catalog.cdc_table_type()
220            == risingwave_pb::catalog::table::CdcTableType::Postgres
221        {
222            let toastable_indices: Vec<usize> = table_columns
223                .iter()
224                .enumerate()
225                .filter_map(|(index, column)| match &column.data_type {
226                    // Currently supports TOAST updates for:
227                    // - jsonb (DataType::Jsonb)
228                    // - varchar (DataType::Varchar)
229                    // - bytea (DataType::Bytea)
230                    // - One-dimensional arrays of the above types (DataType::List)
231                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
232
233                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
234                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
235                        Some(index)
236                    }
237                    _ => None,
238                })
239                .collect();
240
241            if toastable_indices.is_empty() {
242                None
243            } else {
244                Some(toastable_indices)
245            }
246        } else {
247            None
248        };
249
250        let row_serde: BasicSerde = BasicSerde::new(
251            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
252            Arc::from(table_columns.into_boxed_slice()),
253        );
254
255        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
256        let may_have_downstream = actor_context.initial_dispatch_num != 0;
257        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
258        let op_consistency_level =
259            get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
260        let state_table_metrics = metrics.new_state_table_metrics(
261            table_catalog.id,
262            actor_context.id,
263            actor_context.fragment_id,
264        );
265        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
266        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
267            .with_op_consistency_level(op_consistency_level)
268            .enable_preload_all_rows_by_config(&actor_context.config)
269            .enable_vnode_key_pruning(true)
270            .with_metrics(state_table_metrics)
271            .build()
272            .await;
273
274        let mv_metrics = metrics.new_materialize_metrics(
275            table_catalog.id,
276            actor_context.id,
277            actor_context.fragment_id,
278        );
279        let cache_metrics = metrics.new_materialize_cache_metrics(
280            table_catalog.id,
281            actor_context.id,
282            actor_context.fragment_id,
283        );
284
285        let metrics_info =
286            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
287
288        let is_dummy_table =
289            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
290
291        Self {
292            input,
293            schema,
294            state_table,
295            arrange_key_indices,
296            actor_context,
297            materialize_cache: MaterializeCache::new(
298                watermark_epoch,
299                metrics_info,
300                row_serde,
301                version_column_indices.clone(),
302                conflict_behavior,
303                toastable_column_indices,
304                cache_metrics,
305            ),
306            conflict_behavior,
307            version_column_indices,
308            is_dummy_table,
309            may_have_downstream,
310            subscriber_ids,
311            metrics: mv_metrics,
312            refresh_args,
313            local_barrier_manager,
314        }
315    }
316
317    #[try_stream(ok = Message, error = StreamExecutorError)]
318    async fn execute_inner(mut self) {
319        let mv_table_id = self.state_table.table_id();
320        let data_types = self.schema.data_types();
321        let mut input = self.input.execute();
322
323        let barrier = expect_first_barrier(&mut input).await?;
324        let first_epoch = barrier.epoch;
325        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
326        // The first barrier message should be propagated.
327        yield Message::Barrier(barrier);
328        self.state_table.init_epoch(first_epoch).await?;
329
330        // default to normal ingestion
331        let mut inner_state =
332            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
333        // Initialize staging table for refreshable materialized views
334        if let Some(ref mut refresh_args) = self.refresh_args {
335            refresh_args.staging_table.init_epoch(first_epoch).await?;
336
337            // Initialize progress table and load existing progress for recovery
338            refresh_args.progress_table.recover(first_epoch).await?;
339
340            // Check if refresh is already in progress (recovery scenario)
341            let progress_stats = refresh_args.progress_table.get_progress_stats();
342            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
343                refresh_args.is_refreshing = true;
344                tracing::info!(
345                    total_vnodes = progress_stats.total_vnodes,
346                    completed_vnodes = progress_stats.completed_vnodes,
347                    "Recovered refresh in progress, resuming refresh operation"
348                );
349
350                // Since stage info is no longer stored in progress table,
351                // we need to determine recovery state differently.
352                // For now, assume all incomplete VNodes need to continue merging
353                let incomplete_vnodes: Vec<_> = refresh_args
354                    .progress_table
355                    .get_all_progress()
356                    .iter()
357                    .filter(|(_, entry)| !entry.is_completed)
358                    .map(|(&vnode, _)| vnode)
359                    .collect();
360
361                if !incomplete_vnodes.is_empty() {
362                    // Some VNodes are incomplete, need to resume refresh operation
363                    tracing::info!(
364                        incomplete_vnodes = incomplete_vnodes.len(),
365                        "Recovery detected incomplete VNodes, resuming refresh operation"
366                    );
367                    // Since stage tracking is now in memory, we'll determine the appropriate
368                    // stage based on the executor's internal state machine
369                } else {
370                    // This should not happen if is_complete() returned false, but handle it gracefully
371                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
372                }
373            }
374        }
375
376        // Determine initial execution stage (for recovery scenarios)
377        if let Some(ref refresh_args) = self.refresh_args
378            && refresh_args.is_refreshing
379        {
380            // Recovery logic: Check if there are incomplete vnodes from previous run
381            let incomplete_vnodes: Vec<_> = refresh_args
382                .progress_table
383                .get_all_progress()
384                .iter()
385                .filter(|(_, entry)| !entry.is_completed)
386                .map(|(&vnode, _)| vnode)
387                .collect();
388            if !incomplete_vnodes.is_empty() {
389                // Resume from merge stage since some VNodes were left incomplete
390                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
391                tracing::info!(
392                    incomplete_vnodes = incomplete_vnodes.len(),
393                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
394                );
395            }
396        }
397
398        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
399        'main_loop: loop {
400            match *inner_state {
401                MaterializeStreamState::NormalIngestion => {
402                    #[for_await]
403                    '_normal_ingest: for msg in input.by_ref() {
404                        let msg = msg?;
405                        if let Some(cache) = &mut self.materialize_cache {
406                            cache.evict();
407                        }
408
409                        match msg {
410                            Message::Watermark(w) => {
411                                yield Message::Watermark(w);
412                            }
413                            Message::Chunk(chunk) if self.is_dummy_table => {
414                                self.metrics
415                                    .materialize_input_row_count
416                                    .inc_by(chunk.cardinality() as u64);
417                                yield Message::Chunk(chunk);
418                            }
419                            Message::Chunk(chunk) => {
420                                self.metrics
421                                    .materialize_input_row_count
422                                    .inc_by(chunk.cardinality() as u64);
423
424                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
425                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
426                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
427                                // while outputting inconsistent changes to downstream which no one will subscribe to.
428                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
429                                    self.conflict_behavior
430                                    && !self.state_table.is_consistent_op()
431                                    && self.version_column_indices.is_empty()
432                                {
433                                    ConflictBehavior::NoCheck
434                                } else {
435                                    self.conflict_behavior
436                                };
437
438                                match optimized_conflict_behavior {
439                                    checked_conflict_behaviors!() => {
440                                        if chunk.cardinality() == 0 {
441                                            // empty chunk
442                                            continue;
443                                        }
444
445                                        // For refreshable materialized views, write to staging table during refresh
446                                        // Do not use generate_output here.
447                                        if let Some(ref mut refresh_args) = self.refresh_args
448                                            && refresh_args.is_refreshing
449                                        {
450                                            let key_chunk = chunk
451                                                .clone()
452                                                .project(self.state_table.pk_indices());
453                                            tracing::trace!(
454                                                staging_chunk = %key_chunk.to_pretty(),
455                                                input_chunk = %chunk.to_pretty(),
456                                                "writing to staging table"
457                                            );
458                                            if cfg!(debug_assertions) {
459                                                // refreshable source should be append-only
460                                                assert!(
461                                                    key_chunk
462                                                        .ops()
463                                                        .iter()
464                                                        .all(|op| op == &Op::Insert)
465                                                );
466                                            }
467                                            refresh_args
468                                                .staging_table
469                                                .write_chunk(key_chunk.clone());
470                                            refresh_args.staging_table.try_flush().await?;
471                                        }
472
473                                        let cache = self.materialize_cache.as_mut().unwrap();
474                                        let change_buffer =
475                                            cache.handle_new(chunk, &self.state_table).await?;
476
477                                        match change_buffer
478                                            .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
479                                        {
480                                            Some(output_chunk) => {
481                                                self.state_table.write_chunk(output_chunk.clone());
482                                                self.state_table.try_flush().await?;
483                                                yield Message::Chunk(output_chunk);
484                                            }
485                                            None => continue,
486                                        }
487                                    }
488                                    ConflictBehavior::NoCheck => {
489                                        self.state_table.write_chunk(chunk.clone());
490                                        self.state_table.try_flush().await?;
491
492                                        // For refreshable materialized views, also write to staging table during refresh
493                                        if let Some(ref mut refresh_args) = self.refresh_args
494                                            && refresh_args.is_refreshing
495                                        {
496                                            let key_chunk = chunk
497                                                .clone()
498                                                .project(self.state_table.pk_indices());
499                                            tracing::trace!(
500                                                staging_chunk = %key_chunk.to_pretty(),
501                                                input_chunk = %chunk.to_pretty(),
502                                                "writing to staging table"
503                                            );
504                                            if cfg!(debug_assertions) {
505                                                // refreshable source should be append-only
506                                                assert!(
507                                                    key_chunk
508                                                        .ops()
509                                                        .iter()
510                                                        .all(|op| op == &Op::Insert)
511                                                );
512                                            }
513                                            refresh_args
514                                                .staging_table
515                                                .write_chunk(key_chunk.clone());
516                                            refresh_args.staging_table.try_flush().await?;
517                                        }
518
519                                        yield Message::Chunk(chunk);
520                                    }
521                                }
522                            }
523                            Message::Barrier(barrier) => {
524                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
525                                    barrier,
526                                    expect_next_state: Box::new(
527                                        MaterializeStreamState::NormalIngestion,
528                                    ),
529                                };
530                                continue 'main_loop;
531                            }
532                        }
533                    }
534
535                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
536                        anyhow::anyhow!(
537                            "Input stream terminated unexpectedly during normal ingestion"
538                        ),
539                    )));
540                }
541                MaterializeStreamState::MergingData => {
542                    let Some(refresh_args) = self.refresh_args.as_mut() else {
543                        panic!(
544                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
545                        );
546                    };
547                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
548
549                    debug_assert_eq!(
550                        self.state_table.vnodes(),
551                        refresh_args.staging_table.vnodes()
552                    );
553                    debug_assert_eq!(
554                        refresh_args.staging_table.vnodes(),
555                        refresh_args.progress_table.vnodes()
556                    );
557
558                    let mut rows_to_delete = vec![];
559                    let mut merge_complete = false;
560                    let mut pending_barrier: Option<Barrier> = None;
561
562                    // Scope to limit immutable borrows to state tables
563                    {
564                        let left_input = input.by_ref().map(Either::Left);
565                        let right_merge_sort = pin!(
566                            Self::make_mergesort_stream(
567                                &self.state_table,
568                                &refresh_args.staging_table,
569                                &mut refresh_args.progress_table
570                            )
571                            .map(Either::Right)
572                        );
573
574                        // Prefer to select input stream to handle barriers promptly
575                        // Rebuild the merge stream each time processing a barrier
576                        let mut merge_stream =
577                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
578                                stream::PollNext::Left
579                            });
580
581                        #[for_await]
582                        'merge_stream: for either in &mut merge_stream {
583                            match either {
584                                Either::Left(msg) => {
585                                    let msg = msg?;
586                                    match msg {
587                                        Message::Watermark(w) => yield Message::Watermark(w),
588                                        Message::Chunk(chunk) => {
589                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
590                                        }
591                                        Message::Barrier(b) => {
592                                            pending_barrier = Some(b);
593                                            break 'merge_stream;
594                                        }
595                                    }
596                                }
597                                Either::Right(result) => {
598                                    match result? {
599                                        Some((_vnode, row)) => {
600                                            rows_to_delete.push(row);
601                                        }
602                                        None => {
603                                            // Merge stream finished
604                                            merge_complete = true;
605
606                                            // If the merge stream finished, we need to wait for the next barrier to commit states
607                                        }
608                                    }
609                                }
610                            }
611                        }
612                    }
613
614                    // Process collected rows for deletion
615                    for row in &rows_to_delete {
616                        self.state_table.delete(row);
617                    }
618                    if !rows_to_delete.is_empty() {
619                        let to_delete_chunk = StreamChunk::from_rows(
620                            &rows_to_delete
621                                .iter()
622                                .map(|row| (Op::Delete, row))
623                                .collect_vec(),
624                            &self.schema.data_types(),
625                        );
626
627                        yield Message::Chunk(to_delete_chunk);
628                    }
629
630                    // should wait for at least one barrier
631                    assert!(pending_barrier.is_some(), "pending barrier is not set");
632
633                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
634                        barrier: pending_barrier.unwrap(),
635                        expect_next_state: if merge_complete {
636                            Box::new(MaterializeStreamState::CleanUp)
637                        } else {
638                            Box::new(MaterializeStreamState::MergingData)
639                        },
640                    };
641                    continue 'main_loop;
642                }
643                MaterializeStreamState::CleanUp => {
644                    let Some(refresh_args) = self.refresh_args.as_mut() else {
645                        panic!(
646                            "MaterializeExecutor entered MergingData state without refresh_args configured"
647                        );
648                    };
649                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
650
651                    #[for_await]
652                    for msg in input.by_ref() {
653                        let msg = msg?;
654                        match msg {
655                            Message::Watermark(w) => yield Message::Watermark(w),
656                            Message::Chunk(chunk) => {
657                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
658                            }
659                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
660                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
661                                    barrier,
662                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
663                                };
664                                continue 'main_loop;
665                            }
666                            Message::Barrier(barrier) => {
667                                let staging_table_id = refresh_args.staging_table.table_id();
668                                let epoch = barrier.epoch;
669                                self.local_barrier_manager.report_refresh_finished(
670                                    epoch,
671                                    self.actor_context.id,
672                                    refresh_args.table_id,
673                                    staging_table_id,
674                                );
675                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
676
677                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
678                                    barrier,
679                                    expect_next_state: Box::new(
680                                        MaterializeStreamState::RefreshEnd {
681                                            on_complete_epoch: epoch,
682                                        },
683                                    ),
684                                };
685                                continue 'main_loop;
686                            }
687                        }
688                    }
689                }
690                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
691                    let Some(refresh_args) = self.refresh_args.as_mut() else {
692                        panic!(
693                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
694                        );
695                    };
696                    let staging_table_id = refresh_args.staging_table.table_id();
697
698                    // Wait for staging table truncation to complete
699                    let staging_store = refresh_args.staging_table.state_store().clone();
700                    staging_store
701                        .try_wait_epoch(
702                            HummockReadEpoch::Committed(on_complete_epoch.prev),
703                            TryWaitEpochOptions {
704                                table_id: staging_table_id,
705                            },
706                        )
707                        .await?;
708
709                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
710
711                    if let Some(ref mut refresh_args) = self.refresh_args {
712                        refresh_args.is_refreshing = false;
713                    }
714                    *inner_state = MaterializeStreamState::NormalIngestion;
715                    continue 'main_loop;
716                }
717                MaterializeStreamState::CommitAndYieldBarrier {
718                    barrier,
719                    mut expect_next_state,
720                } => {
721                    if let Some(ref mut refresh_args) = self.refresh_args {
722                        match barrier.mutation.as_deref() {
723                            Some(Mutation::RefreshStart {
724                                table_id: refresh_table_id,
725                                associated_source_id: _,
726                            }) if *refresh_table_id == refresh_args.table_id => {
727                                debug_assert!(
728                                    !refresh_args.is_refreshing,
729                                    "cannot start refresh twice"
730                                );
731                                refresh_args.is_refreshing = true;
732                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
733
734                                // Initialize progress tracking for all VNodes
735                                Self::init_refresh_progress(
736                                    &self.state_table,
737                                    &mut refresh_args.progress_table,
738                                    barrier.epoch.curr,
739                                )?;
740                            }
741                            Some(Mutation::LoadFinish {
742                                associated_source_id: load_finish_source_id,
743                            }) => {
744                                // Get associated source id from table catalog
745                                let associated_source_id: SourceId = match refresh_args
746                                    .table_catalog
747                                    .optional_associated_source_id
748                                {
749                                    Some(id) => id.into(),
750                                    None => unreachable!("associated_source_id is not set"),
751                                };
752
753                                if *load_finish_source_id == associated_source_id {
754                                    tracing::info!(
755                                        %load_finish_source_id,
756                                        "LoadFinish received, starting data replacement"
757                                    );
758                                    expect_next_state =
759                                        Box::new(MaterializeStreamState::<_>::MergingData);
760                                }
761                            }
762                            _ => {}
763                        }
764                    }
765
766                    // ===== normal operation =====
767
768                    // If a downstream mv depends on the current table, we need to do conflict check again.
769                    if !self.may_have_downstream
770                        && barrier.has_more_downstream_fragments(self.actor_context.id)
771                    {
772                        self.may_have_downstream = true;
773                    }
774                    Self::may_update_depended_subscriptions(
775                        &mut self.subscriber_ids,
776                        &barrier,
777                        mv_table_id,
778                    );
779                    let op_consistency_level = get_op_consistency_level(
780                        self.conflict_behavior,
781                        self.may_have_downstream,
782                        &self.subscriber_ids,
783                    );
784                    let post_commit = self
785                        .state_table
786                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
787                        .await?;
788                    if !post_commit.inner().is_consistent_op() {
789                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
790                    }
791
792                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
793
794                    // Commit staging table for refreshable materialized views
795                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
796                    {
797                        // Commit progress table for fault tolerance
798
799                        Some((
800                            refresh_args.staging_table.commit(barrier.epoch).await?,
801                            refresh_args.progress_table.commit(barrier.epoch).await?,
802                        ))
803                    } else {
804                        None
805                    };
806
807                    let b_epoch = barrier.epoch;
808                    yield Message::Barrier(barrier);
809
810                    // Update the vnode bitmap for the state table if asked.
811                    if let Some((_, cache_may_stale)) = post_commit
812                        .post_yield_barrier(update_vnode_bitmap.clone())
813                        .await?
814                        && cache_may_stale
815                        && let Some(cache) = &mut self.materialize_cache
816                    {
817                        cache.clear();
818                    }
819
820                    // Handle staging table post commit
821                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
822                        staging_post_commit
823                            .post_yield_barrier(update_vnode_bitmap.clone())
824                            .await?;
825                        progress_post_commit
826                            .post_yield_barrier(update_vnode_bitmap)
827                            .await?;
828                    }
829
830                    self.metrics
831                        .materialize_current_epoch
832                        .set(b_epoch.curr as i64);
833
834                    // ====== transition to next state ======
835
836                    *inner_state = *expect_next_state;
837                }
838            }
839        }
840    }
841
842    /// Stream that yields rows to be deleted from main table.
843    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
844    /// Yields `None` when finished processing all vnodes.
845    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
846    async fn make_mergesort_stream<'a>(
847        main_table: &'a StateTableInner<S, SD>,
848        staging_table: &'a StateTableInner<S, SD>,
849        progress_table: &'a mut RefreshProgressTable<S>,
850    ) {
851        for vnode in main_table.vnodes().clone().iter_vnodes() {
852            let mut processed_rows = 0;
853            // Check if this VNode has already been completed (for fault tolerance)
854            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
855                if let Some(current_entry) = progress_table.get_progress(vnode) {
856                    // Skip already completed VNodes during recovery
857                    if current_entry.is_completed {
858                        tracing::debug!(
859                            vnode = vnode.to_index(),
860                            "Skipping already completed VNode during recovery"
861                        );
862                        continue;
863                    }
864                    processed_rows += current_entry.processed_rows;
865                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
866
867                    if let Some(current_state) = &current_entry.current_pos {
868                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
869                    } else {
870                        (Bound::Unbounded, Bound::Unbounded)
871                    }
872                } else {
873                    (Bound::Unbounded, Bound::Unbounded)
874                };
875
876            let iter_main = main_table
877                .iter_keyed_row_with_vnode(
878                    vnode,
879                    &pk_range,
880                    PrefetchOptions::prefetch_for_large_range_scan(),
881                )
882                .await?;
883            let iter_staging = staging_table
884                .iter_keyed_row_with_vnode(
885                    vnode,
886                    &pk_range,
887                    PrefetchOptions::prefetch_for_large_range_scan(),
888                )
889                .await?;
890
891            pin_mut!(iter_main);
892            pin_mut!(iter_staging);
893
894            // Sort-merge join implementation using dual pointers
895            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
896            let mut staging_item: Option<KeyedRow<Bytes>> =
897                iter_staging.next().await.transpose()?;
898
899            while let Some(main_kv) = main_item {
900                let main_key = main_kv.key();
901
902                // Advance staging iterator until we find a key >= main_key
903                let mut should_delete = false;
904                while let Some(staging_kv) = &staging_item {
905                    let staging_key = staging_kv.key();
906                    match main_key.cmp(staging_key) {
907                        std::cmp::Ordering::Greater => {
908                            // main_key > staging_key, advance staging
909                            staging_item = iter_staging.next().await.transpose()?;
910                        }
911                        std::cmp::Ordering::Equal => {
912                            // Keys match, this row exists in both tables, no need to delete
913                            break;
914                        }
915                        std::cmp::Ordering::Less => {
916                            // main_key < staging_key, main row doesn't exist in staging, delete it
917                            should_delete = true;
918                            break;
919                        }
920                    }
921                }
922
923                // If staging_item is None, all remaining main rows should be deleted
924                if staging_item.is_none() {
925                    should_delete = true;
926                }
927
928                if should_delete {
929                    yield Some((vnode, main_kv.row().clone()));
930                }
931
932                // Advance main iterator
933                processed_rows += 1;
934                tracing::debug!(
935                    "set progress table: vnode = {:?}, processed_rows = {:?}",
936                    vnode,
937                    processed_rows
938                );
939                progress_table.set_progress(
940                    vnode,
941                    Some(
942                        main_kv
943                            .row()
944                            .project(main_table.pk_indices())
945                            .to_owned_row(),
946                    ),
947                    false,
948                    processed_rows,
949                )?;
950                main_item = iter_main.next().await.transpose()?;
951            }
952
953            // Mark this VNode as completed
954            if let Some(current_entry) = progress_table.get_progress(vnode) {
955                progress_table.set_progress(
956                    vnode,
957                    current_entry.current_pos.clone(),
958                    true, // completed
959                    current_entry.processed_rows,
960                )?;
961
962                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
963            }
964        }
965
966        // Signal completion
967        yield None;
968    }
969
970    /// return true when changed
971    fn may_update_depended_subscriptions(
972        depended_subscriptions: &mut HashSet<SubscriberId>,
973        barrier: &Barrier,
974        mv_table_id: TableId,
975    ) {
976        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
977            if !depended_subscriptions.insert(subscriber_id) {
978                warn!(
979                    ?depended_subscriptions,
980                    %mv_table_id,
981                    %subscriber_id,
982                    "subscription id already exists"
983                );
984            }
985        }
986
987        if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
988            for SubscriptionUpstreamInfo {
989                subscriber_id,
990                upstream_mv_table_id,
991            } in subscriptions_to_drop
992            {
993                if *upstream_mv_table_id == mv_table_id
994                    && !depended_subscriptions.remove(subscriber_id)
995                {
996                    warn!(
997                        ?depended_subscriptions,
998                        %mv_table_id,
999                        %subscriber_id,
1000                        "drop non existing subscriber_id id"
1001                    );
1002                }
1003            }
1004        }
1005    }
1006
1007    /// Initialize refresh progress tracking for all `VNodes`
1008    fn init_refresh_progress(
1009        state_table: &StateTableInner<S, SD>,
1010        progress_table: &mut RefreshProgressTable<S>,
1011        _epoch: u64,
1012    ) -> StreamExecutorResult<()> {
1013        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1014
1015        // Initialize progress for all VNodes in the current bitmap
1016        for vnode in state_table.vnodes().iter_vnodes() {
1017            progress_table.set_progress(
1018                vnode, None,  // initial position
1019                false, // not completed yet
1020                0,     // initial processed rows
1021            )?;
1022        }
1023
1024        tracing::info!(
1025            vnodes_count = state_table.vnodes().count_ones(),
1026            "Initialized refresh progress tracking for all VNodes"
1027        );
1028
1029        Ok(())
1030    }
1031}
1032
1033impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1034    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1035    #[cfg(any(test, feature = "test"))]
1036    pub async fn for_test(
1037        input: Executor,
1038        store: S,
1039        table_id: TableId,
1040        keys: Vec<ColumnOrder>,
1041        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1042        watermark_epoch: AtomicU64Ref,
1043        conflict_behavior: ConflictBehavior,
1044    ) -> Self {
1045        use risingwave_common::util::iter_util::ZipEqFast;
1046
1047        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1048        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1049        let schema = input.schema().clone();
1050        let columns: Vec<ColumnDesc> = column_ids
1051            .into_iter()
1052            .zip_eq_fast(schema.fields.iter())
1053            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1054            .collect_vec();
1055
1056        let row_serde = BasicSerde::new(
1057            Arc::from((0..columns.len()).collect_vec()),
1058            Arc::from(columns.clone().into_boxed_slice()),
1059        );
1060        let state_table = StateTableInner::from_table_catalog(
1061            &crate::common::table::test_utils::gen_pbtable(
1062                table_id,
1063                columns,
1064                arrange_order_types,
1065                arrange_columns.clone(),
1066                0,
1067            ),
1068            store,
1069            None,
1070        )
1071        .await;
1072
1073        let unused = StreamingMetrics::unused();
1074        let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1075        let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1076
1077        Self {
1078            input,
1079            schema,
1080            state_table,
1081            arrange_key_indices: arrange_columns.clone(),
1082            actor_context: ActorContext::for_test(0),
1083            materialize_cache: MaterializeCache::new(
1084                watermark_epoch,
1085                MetricsInfo::for_test(),
1086                row_serde,
1087                vec![],
1088                conflict_behavior,
1089                None,
1090                cache_metrics,
1091            ),
1092            conflict_behavior,
1093            version_column_indices: vec![],
1094            is_dummy_table: false,
1095            may_have_downstream: true,
1096            subscriber_ids: HashSet::new(),
1097            metrics,
1098            refresh_args: None, // Test constructor doesn't support refresh functionality
1099            local_barrier_manager: LocalBarrierManager::for_test(),
1100        }
1101    }
1102}
1103
1104impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1105    fn execute(self: Box<Self>) -> BoxedMessageStream {
1106        self.execute_inner().boxed()
1107    }
1108}
1109
1110impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1112        f.debug_struct("MaterializeExecutor")
1113            .field("arrange_key_indices", &self.arrange_key_indices)
1114            .finish()
1115    }
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120
1121    use std::iter;
1122    use std::sync::atomic::AtomicU64;
1123
1124    use rand::rngs::SmallRng;
1125    use rand::{Rng, RngCore, SeedableRng};
1126    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1127    use risingwave_common::catalog::Field;
1128    use risingwave_common::util::epoch::test_epoch;
1129    use risingwave_common::util::sort_util::OrderType;
1130    use risingwave_hummock_sdk::HummockReadEpoch;
1131    use risingwave_storage::memory::MemoryStateStore;
1132    use risingwave_storage::table::batch_table::BatchTable;
1133
1134    use super::*;
1135    use crate::executor::test_utils::*;
1136
1137    #[tokio::test]
1138    async fn test_materialize_executor() {
1139        // Prepare storage and memtable.
1140        let memory_state_store = MemoryStateStore::new();
1141        let table_id = TableId::new(1);
1142        // Two columns of int32 type, the first column is PK.
1143        let schema = Schema::new(vec![
1144            Field::unnamed(DataType::Int32),
1145            Field::unnamed(DataType::Int32),
1146        ]);
1147        let column_ids = vec![0.into(), 1.into()];
1148
1149        // Prepare source chunks.
1150        let chunk1 = StreamChunk::from_pretty(
1151            " i i
1152            + 1 4
1153            + 2 5
1154            + 3 6",
1155        );
1156        let chunk2 = StreamChunk::from_pretty(
1157            " i i
1158            + 7 8
1159            - 3 6",
1160        );
1161
1162        // Prepare stream executors.
1163        let source = MockSource::with_messages(vec![
1164            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1165            Message::Chunk(chunk1),
1166            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1167            Message::Chunk(chunk2),
1168            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1169        ])
1170        .into_executor(schema.clone(), StreamKey::new());
1171
1172        let order_types = vec![OrderType::ascending()];
1173        let column_descs = vec![
1174            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1175            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1176        ];
1177
1178        let table = BatchTable::for_test(
1179            memory_state_store.clone(),
1180            table_id,
1181            column_descs,
1182            order_types,
1183            vec![0],
1184            vec![0, 1],
1185        );
1186
1187        let mut materialize_executor = MaterializeExecutor::for_test(
1188            source,
1189            memory_state_store,
1190            table_id,
1191            vec![ColumnOrder::new(0, OrderType::ascending())],
1192            column_ids,
1193            Arc::new(AtomicU64::new(0)),
1194            ConflictBehavior::NoCheck,
1195        )
1196        .await
1197        .boxed()
1198        .execute();
1199        materialize_executor.next().await.transpose().unwrap();
1200
1201        materialize_executor.next().await.transpose().unwrap();
1202
1203        // First stream chunk. We check the existence of (3) -> (3,6)
1204        match materialize_executor.next().await.transpose().unwrap() {
1205            Some(Message::Barrier(_)) => {
1206                let row = table
1207                    .get_row(
1208                        &OwnedRow::new(vec![Some(3_i32.into())]),
1209                        HummockReadEpoch::NoWait(u64::MAX),
1210                    )
1211                    .await
1212                    .unwrap();
1213                assert_eq!(
1214                    row,
1215                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1216                );
1217            }
1218            _ => unreachable!(),
1219        }
1220        materialize_executor.next().await.transpose().unwrap();
1221        // Second stream chunk. We check the existence of (7) -> (7,8)
1222        match materialize_executor.next().await.transpose().unwrap() {
1223            Some(Message::Barrier(_)) => {
1224                let row = table
1225                    .get_row(
1226                        &OwnedRow::new(vec![Some(7_i32.into())]),
1227                        HummockReadEpoch::NoWait(u64::MAX),
1228                    )
1229                    .await
1230                    .unwrap();
1231                assert_eq!(
1232                    row,
1233                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1234                );
1235            }
1236            _ => unreachable!(),
1237        }
1238    }
1239
1240    // https://github.com/risingwavelabs/risingwave/issues/13346
1241    #[tokio::test]
1242    async fn test_upsert_stream() {
1243        // Prepare storage and memtable.
1244        let memory_state_store = MemoryStateStore::new();
1245        let table_id = TableId::new(1);
1246        // Two columns of int32 type, the first column is PK.
1247        let schema = Schema::new(vec![
1248            Field::unnamed(DataType::Int32),
1249            Field::unnamed(DataType::Int32),
1250        ]);
1251        let column_ids = vec![0.into(), 1.into()];
1252
1253        // test double insert one pk, the latter needs to override the former.
1254        let chunk1 = StreamChunk::from_pretty(
1255            " i i
1256            + 1 1",
1257        );
1258
1259        let chunk2 = StreamChunk::from_pretty(
1260            " i i
1261            + 1 2
1262            - 1 2",
1263        );
1264
1265        // Prepare stream executors.
1266        let source = MockSource::with_messages(vec![
1267            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1268            Message::Chunk(chunk1),
1269            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1270            Message::Chunk(chunk2),
1271            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1272        ])
1273        .into_executor(schema.clone(), StreamKey::new());
1274
1275        let order_types = vec![OrderType::ascending()];
1276        let column_descs = vec![
1277            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1278            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1279        ];
1280
1281        let table = BatchTable::for_test(
1282            memory_state_store.clone(),
1283            table_id,
1284            column_descs,
1285            order_types,
1286            vec![0],
1287            vec![0, 1],
1288        );
1289
1290        let mut materialize_executor = MaterializeExecutor::for_test(
1291            source,
1292            memory_state_store,
1293            table_id,
1294            vec![ColumnOrder::new(0, OrderType::ascending())],
1295            column_ids,
1296            Arc::new(AtomicU64::new(0)),
1297            ConflictBehavior::Overwrite,
1298        )
1299        .await
1300        .boxed()
1301        .execute();
1302        materialize_executor.next().await.transpose().unwrap();
1303
1304        materialize_executor.next().await.transpose().unwrap();
1305        materialize_executor.next().await.transpose().unwrap();
1306        materialize_executor.next().await.transpose().unwrap();
1307
1308        match materialize_executor.next().await.transpose().unwrap() {
1309            Some(Message::Barrier(_)) => {
1310                let row = table
1311                    .get_row(
1312                        &OwnedRow::new(vec![Some(1_i32.into())]),
1313                        HummockReadEpoch::NoWait(u64::MAX),
1314                    )
1315                    .await
1316                    .unwrap();
1317                assert!(row.is_none());
1318            }
1319            _ => unreachable!(),
1320        }
1321    }
1322
1323    #[tokio::test]
1324    async fn test_check_insert_conflict() {
1325        // Prepare storage and memtable.
1326        let memory_state_store = MemoryStateStore::new();
1327        let table_id = TableId::new(1);
1328        // Two columns of int32 type, the first column is PK.
1329        let schema = Schema::new(vec![
1330            Field::unnamed(DataType::Int32),
1331            Field::unnamed(DataType::Int32),
1332        ]);
1333        let column_ids = vec![0.into(), 1.into()];
1334
1335        // test double insert one pk, the latter needs to override the former.
1336        let chunk1 = StreamChunk::from_pretty(
1337            " i i
1338            + 1 3
1339            + 1 4
1340            + 2 5
1341            + 3 6",
1342        );
1343
1344        let chunk2 = StreamChunk::from_pretty(
1345            " i i
1346            + 1 3
1347            + 2 6",
1348        );
1349
1350        // test delete wrong value, delete inexistent pk
1351        let chunk3 = StreamChunk::from_pretty(
1352            " i i
1353            + 1 4",
1354        );
1355
1356        // Prepare stream executors.
1357        let source = MockSource::with_messages(vec![
1358            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1359            Message::Chunk(chunk1),
1360            Message::Chunk(chunk2),
1361            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1362            Message::Chunk(chunk3),
1363            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1364        ])
1365        .into_executor(schema.clone(), StreamKey::new());
1366
1367        let order_types = vec![OrderType::ascending()];
1368        let column_descs = vec![
1369            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1370            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1371        ];
1372
1373        let table = BatchTable::for_test(
1374            memory_state_store.clone(),
1375            table_id,
1376            column_descs,
1377            order_types,
1378            vec![0],
1379            vec![0, 1],
1380        );
1381
1382        let mut materialize_executor = MaterializeExecutor::for_test(
1383            source,
1384            memory_state_store,
1385            table_id,
1386            vec![ColumnOrder::new(0, OrderType::ascending())],
1387            column_ids,
1388            Arc::new(AtomicU64::new(0)),
1389            ConflictBehavior::Overwrite,
1390        )
1391        .await
1392        .boxed()
1393        .execute();
1394        materialize_executor.next().await.transpose().unwrap();
1395
1396        materialize_executor.next().await.transpose().unwrap();
1397        materialize_executor.next().await.transpose().unwrap();
1398
1399        // First stream chunk. We check the existence of (3) -> (3,6)
1400        match materialize_executor.next().await.transpose().unwrap() {
1401            Some(Message::Barrier(_)) => {
1402                let row = table
1403                    .get_row(
1404                        &OwnedRow::new(vec![Some(3_i32.into())]),
1405                        HummockReadEpoch::NoWait(u64::MAX),
1406                    )
1407                    .await
1408                    .unwrap();
1409                assert_eq!(
1410                    row,
1411                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1412                );
1413
1414                let row = table
1415                    .get_row(
1416                        &OwnedRow::new(vec![Some(1_i32.into())]),
1417                        HummockReadEpoch::NoWait(u64::MAX),
1418                    )
1419                    .await
1420                    .unwrap();
1421                assert_eq!(
1422                    row,
1423                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1424                );
1425
1426                let row = table
1427                    .get_row(
1428                        &OwnedRow::new(vec![Some(2_i32.into())]),
1429                        HummockReadEpoch::NoWait(u64::MAX),
1430                    )
1431                    .await
1432                    .unwrap();
1433                assert_eq!(
1434                    row,
1435                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1436                );
1437            }
1438            _ => unreachable!(),
1439        }
1440    }
1441
1442    #[tokio::test]
1443    async fn test_delete_and_update_conflict() {
1444        // Prepare storage and memtable.
1445        let memory_state_store = MemoryStateStore::new();
1446        let table_id = TableId::new(1);
1447        // Two columns of int32 type, the first column is PK.
1448        let schema = Schema::new(vec![
1449            Field::unnamed(DataType::Int32),
1450            Field::unnamed(DataType::Int32),
1451        ]);
1452        let column_ids = vec![0.into(), 1.into()];
1453
1454        // test double insert one pk, the latter needs to override the former.
1455        let chunk1 = StreamChunk::from_pretty(
1456            " i i
1457            + 1 4
1458            + 2 5
1459            + 3 6
1460            U- 8 1
1461            U+ 8 2
1462            + 8 3",
1463        );
1464
1465        // test delete wrong value, delete inexistent pk
1466        let chunk2 = StreamChunk::from_pretty(
1467            " i i
1468            + 7 8
1469            - 3 4
1470            - 5 0",
1471        );
1472
1473        // test delete wrong value, delete inexistent pk
1474        let chunk3 = StreamChunk::from_pretty(
1475            " i i
1476            + 1 5
1477            U- 2 4
1478            U+ 2 8
1479            U- 9 0
1480            U+ 9 1",
1481        );
1482
1483        // Prepare stream executors.
1484        let source = MockSource::with_messages(vec![
1485            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1486            Message::Chunk(chunk1),
1487            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1488            Message::Chunk(chunk2),
1489            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1490            Message::Chunk(chunk3),
1491            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1492        ])
1493        .into_executor(schema.clone(), StreamKey::new());
1494
1495        let order_types = vec![OrderType::ascending()];
1496        let column_descs = vec![
1497            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1498            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1499        ];
1500
1501        let table = BatchTable::for_test(
1502            memory_state_store.clone(),
1503            table_id,
1504            column_descs,
1505            order_types,
1506            vec![0],
1507            vec![0, 1],
1508        );
1509
1510        let mut materialize_executor = MaterializeExecutor::for_test(
1511            source,
1512            memory_state_store,
1513            table_id,
1514            vec![ColumnOrder::new(0, OrderType::ascending())],
1515            column_ids,
1516            Arc::new(AtomicU64::new(0)),
1517            ConflictBehavior::Overwrite,
1518        )
1519        .await
1520        .boxed()
1521        .execute();
1522        materialize_executor.next().await.transpose().unwrap();
1523
1524        materialize_executor.next().await.transpose().unwrap();
1525
1526        // First stream chunk. We check the existence of (3) -> (3,6)
1527        match materialize_executor.next().await.transpose().unwrap() {
1528            Some(Message::Barrier(_)) => {
1529                // can read (8, 3), check insert after update
1530                let row = table
1531                    .get_row(
1532                        &OwnedRow::new(vec![Some(8_i32.into())]),
1533                        HummockReadEpoch::NoWait(u64::MAX),
1534                    )
1535                    .await
1536                    .unwrap();
1537                assert_eq!(
1538                    row,
1539                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1540                );
1541            }
1542            _ => unreachable!(),
1543        }
1544        materialize_executor.next().await.transpose().unwrap();
1545
1546        match materialize_executor.next().await.transpose().unwrap() {
1547            Some(Message::Barrier(_)) => {
1548                let row = table
1549                    .get_row(
1550                        &OwnedRow::new(vec![Some(7_i32.into())]),
1551                        HummockReadEpoch::NoWait(u64::MAX),
1552                    )
1553                    .await
1554                    .unwrap();
1555                assert_eq!(
1556                    row,
1557                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1558                );
1559
1560                // check delete wrong value
1561                let row = table
1562                    .get_row(
1563                        &OwnedRow::new(vec![Some(3_i32.into())]),
1564                        HummockReadEpoch::NoWait(u64::MAX),
1565                    )
1566                    .await
1567                    .unwrap();
1568                assert_eq!(row, None);
1569
1570                // check delete wrong pk
1571                let row = table
1572                    .get_row(
1573                        &OwnedRow::new(vec![Some(5_i32.into())]),
1574                        HummockReadEpoch::NoWait(u64::MAX),
1575                    )
1576                    .await
1577                    .unwrap();
1578                assert_eq!(row, None);
1579            }
1580            _ => unreachable!(),
1581        }
1582
1583        materialize_executor.next().await.transpose().unwrap();
1584        // Second stream chunk. We check the existence of (7) -> (7,8)
1585        match materialize_executor.next().await.transpose().unwrap() {
1586            Some(Message::Barrier(_)) => {
1587                let row = table
1588                    .get_row(
1589                        &OwnedRow::new(vec![Some(1_i32.into())]),
1590                        HummockReadEpoch::NoWait(u64::MAX),
1591                    )
1592                    .await
1593                    .unwrap();
1594                assert_eq!(
1595                    row,
1596                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1597                );
1598
1599                // check update wrong value
1600                let row = table
1601                    .get_row(
1602                        &OwnedRow::new(vec![Some(2_i32.into())]),
1603                        HummockReadEpoch::NoWait(u64::MAX),
1604                    )
1605                    .await
1606                    .unwrap();
1607                assert_eq!(
1608                    row,
1609                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1610                );
1611
1612                // check update wrong pk, should become insert
1613                let row = table
1614                    .get_row(
1615                        &OwnedRow::new(vec![Some(9_i32.into())]),
1616                        HummockReadEpoch::NoWait(u64::MAX),
1617                    )
1618                    .await
1619                    .unwrap();
1620                assert_eq!(
1621                    row,
1622                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1623                );
1624            }
1625            _ => unreachable!(),
1626        }
1627    }
1628
1629    #[tokio::test]
1630    async fn test_ignore_insert_conflict() {
1631        // Prepare storage and memtable.
1632        let memory_state_store = MemoryStateStore::new();
1633        let table_id = TableId::new(1);
1634        // Two columns of int32 type, the first column is PK.
1635        let schema = Schema::new(vec![
1636            Field::unnamed(DataType::Int32),
1637            Field::unnamed(DataType::Int32),
1638        ]);
1639        let column_ids = vec![0.into(), 1.into()];
1640
1641        // test double insert one pk, the latter needs to be ignored.
1642        let chunk1 = StreamChunk::from_pretty(
1643            " i i
1644            + 1 3
1645            + 1 4
1646            + 2 5
1647            + 3 6",
1648        );
1649
1650        let chunk2 = StreamChunk::from_pretty(
1651            " i i
1652            + 1 5
1653            + 2 6",
1654        );
1655
1656        // test delete wrong value, delete inexistent pk
1657        let chunk3 = StreamChunk::from_pretty(
1658            " i i
1659            + 1 6",
1660        );
1661
1662        // Prepare stream executors.
1663        let source = MockSource::with_messages(vec![
1664            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1665            Message::Chunk(chunk1),
1666            Message::Chunk(chunk2),
1667            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1668            Message::Chunk(chunk3),
1669            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1670        ])
1671        .into_executor(schema.clone(), StreamKey::new());
1672
1673        let order_types = vec![OrderType::ascending()];
1674        let column_descs = vec![
1675            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1676            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1677        ];
1678
1679        let table = BatchTable::for_test(
1680            memory_state_store.clone(),
1681            table_id,
1682            column_descs,
1683            order_types,
1684            vec![0],
1685            vec![0, 1],
1686        );
1687
1688        let mut materialize_executor = MaterializeExecutor::for_test(
1689            source,
1690            memory_state_store,
1691            table_id,
1692            vec![ColumnOrder::new(0, OrderType::ascending())],
1693            column_ids,
1694            Arc::new(AtomicU64::new(0)),
1695            ConflictBehavior::IgnoreConflict,
1696        )
1697        .await
1698        .boxed()
1699        .execute();
1700        materialize_executor.next().await.transpose().unwrap();
1701
1702        materialize_executor.next().await.transpose().unwrap();
1703        materialize_executor.next().await.transpose().unwrap();
1704
1705        // First stream chunk. We check the existence of (3) -> (3,6)
1706        match materialize_executor.next().await.transpose().unwrap() {
1707            Some(Message::Barrier(_)) => {
1708                let row = table
1709                    .get_row(
1710                        &OwnedRow::new(vec![Some(3_i32.into())]),
1711                        HummockReadEpoch::NoWait(u64::MAX),
1712                    )
1713                    .await
1714                    .unwrap();
1715                assert_eq!(
1716                    row,
1717                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1718                );
1719
1720                let row = table
1721                    .get_row(
1722                        &OwnedRow::new(vec![Some(1_i32.into())]),
1723                        HummockReadEpoch::NoWait(u64::MAX),
1724                    )
1725                    .await
1726                    .unwrap();
1727                assert_eq!(
1728                    row,
1729                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1730                );
1731
1732                let row = table
1733                    .get_row(
1734                        &OwnedRow::new(vec![Some(2_i32.into())]),
1735                        HummockReadEpoch::NoWait(u64::MAX),
1736                    )
1737                    .await
1738                    .unwrap();
1739                assert_eq!(
1740                    row,
1741                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1742                );
1743            }
1744            _ => unreachable!(),
1745        }
1746    }
1747
1748    #[tokio::test]
1749    async fn test_ignore_delete_then_insert() {
1750        // Prepare storage and memtable.
1751        let memory_state_store = MemoryStateStore::new();
1752        let table_id = TableId::new(1);
1753        // Two columns of int32 type, the first column is PK.
1754        let schema = Schema::new(vec![
1755            Field::unnamed(DataType::Int32),
1756            Field::unnamed(DataType::Int32),
1757        ]);
1758        let column_ids = vec![0.into(), 1.into()];
1759
1760        // test insert after delete one pk, the latter insert should succeed.
1761        let chunk1 = StreamChunk::from_pretty(
1762            " i i
1763            + 1 3
1764            - 1 3
1765            + 1 6",
1766        );
1767
1768        // Prepare stream executors.
1769        let source = MockSource::with_messages(vec![
1770            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1771            Message::Chunk(chunk1),
1772            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1773        ])
1774        .into_executor(schema.clone(), StreamKey::new());
1775
1776        let order_types = vec![OrderType::ascending()];
1777        let column_descs = vec![
1778            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1779            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1780        ];
1781
1782        let table = BatchTable::for_test(
1783            memory_state_store.clone(),
1784            table_id,
1785            column_descs,
1786            order_types,
1787            vec![0],
1788            vec![0, 1],
1789        );
1790
1791        let mut materialize_executor = MaterializeExecutor::for_test(
1792            source,
1793            memory_state_store,
1794            table_id,
1795            vec![ColumnOrder::new(0, OrderType::ascending())],
1796            column_ids,
1797            Arc::new(AtomicU64::new(0)),
1798            ConflictBehavior::IgnoreConflict,
1799        )
1800        .await
1801        .boxed()
1802        .execute();
1803        let _msg1 = materialize_executor
1804            .next()
1805            .await
1806            .transpose()
1807            .unwrap()
1808            .unwrap()
1809            .as_barrier()
1810            .unwrap();
1811        let _msg2 = materialize_executor
1812            .next()
1813            .await
1814            .transpose()
1815            .unwrap()
1816            .unwrap()
1817            .as_chunk()
1818            .unwrap();
1819        let _msg3 = materialize_executor
1820            .next()
1821            .await
1822            .transpose()
1823            .unwrap()
1824            .unwrap()
1825            .as_barrier()
1826            .unwrap();
1827
1828        let row = table
1829            .get_row(
1830                &OwnedRow::new(vec![Some(1_i32.into())]),
1831                HummockReadEpoch::NoWait(u64::MAX),
1832            )
1833            .await
1834            .unwrap();
1835        assert_eq!(
1836            row,
1837            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1838        );
1839    }
1840
1841    #[tokio::test]
1842    async fn test_ignore_delete_and_update_conflict() {
1843        // Prepare storage and memtable.
1844        let memory_state_store = MemoryStateStore::new();
1845        let table_id = TableId::new(1);
1846        // Two columns of int32 type, the first column is PK.
1847        let schema = Schema::new(vec![
1848            Field::unnamed(DataType::Int32),
1849            Field::unnamed(DataType::Int32),
1850        ]);
1851        let column_ids = vec![0.into(), 1.into()];
1852
1853        // test double insert one pk, the latter should be ignored.
1854        let chunk1 = StreamChunk::from_pretty(
1855            " i i
1856            + 1 4
1857            + 2 5
1858            + 3 6
1859            U- 8 1
1860            U+ 8 2
1861            + 8 3",
1862        );
1863
1864        // test delete wrong value, delete inexistent pk
1865        let chunk2 = StreamChunk::from_pretty(
1866            " i i
1867            + 7 8
1868            - 3 4
1869            - 5 0",
1870        );
1871
1872        // test delete wrong value, delete inexistent pk
1873        let chunk3 = StreamChunk::from_pretty(
1874            " i i
1875            + 1 5
1876            U- 2 4
1877            U+ 2 8
1878            U- 9 0
1879            U+ 9 1",
1880        );
1881
1882        // Prepare stream executors.
1883        let source = MockSource::with_messages(vec![
1884            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1885            Message::Chunk(chunk1),
1886            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1887            Message::Chunk(chunk2),
1888            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1889            Message::Chunk(chunk3),
1890            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1891        ])
1892        .into_executor(schema.clone(), StreamKey::new());
1893
1894        let order_types = vec![OrderType::ascending()];
1895        let column_descs = vec![
1896            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1897            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1898        ];
1899
1900        let table = BatchTable::for_test(
1901            memory_state_store.clone(),
1902            table_id,
1903            column_descs,
1904            order_types,
1905            vec![0],
1906            vec![0, 1],
1907        );
1908
1909        let mut materialize_executor = MaterializeExecutor::for_test(
1910            source,
1911            memory_state_store,
1912            table_id,
1913            vec![ColumnOrder::new(0, OrderType::ascending())],
1914            column_ids,
1915            Arc::new(AtomicU64::new(0)),
1916            ConflictBehavior::IgnoreConflict,
1917        )
1918        .await
1919        .boxed()
1920        .execute();
1921        materialize_executor.next().await.transpose().unwrap();
1922
1923        materialize_executor.next().await.transpose().unwrap();
1924
1925        // First stream chunk. We check the existence of (3) -> (3,6)
1926        match materialize_executor.next().await.transpose().unwrap() {
1927            Some(Message::Barrier(_)) => {
1928                // can read (8, 2), check insert after update
1929                let row = table
1930                    .get_row(
1931                        &OwnedRow::new(vec![Some(8_i32.into())]),
1932                        HummockReadEpoch::NoWait(u64::MAX),
1933                    )
1934                    .await
1935                    .unwrap();
1936                assert_eq!(
1937                    row,
1938                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1939                );
1940            }
1941            _ => unreachable!(),
1942        }
1943        materialize_executor.next().await.transpose().unwrap();
1944
1945        match materialize_executor.next().await.transpose().unwrap() {
1946            Some(Message::Barrier(_)) => {
1947                let row = table
1948                    .get_row(
1949                        &OwnedRow::new(vec![Some(7_i32.into())]),
1950                        HummockReadEpoch::NoWait(u64::MAX),
1951                    )
1952                    .await
1953                    .unwrap();
1954                assert_eq!(
1955                    row,
1956                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1957                );
1958
1959                // check delete wrong value
1960                let row = table
1961                    .get_row(
1962                        &OwnedRow::new(vec![Some(3_i32.into())]),
1963                        HummockReadEpoch::NoWait(u64::MAX),
1964                    )
1965                    .await
1966                    .unwrap();
1967                assert_eq!(row, None);
1968
1969                // check delete wrong pk
1970                let row = table
1971                    .get_row(
1972                        &OwnedRow::new(vec![Some(5_i32.into())]),
1973                        HummockReadEpoch::NoWait(u64::MAX),
1974                    )
1975                    .await
1976                    .unwrap();
1977                assert_eq!(row, None);
1978            }
1979            _ => unreachable!(),
1980        }
1981
1982        materialize_executor.next().await.transpose().unwrap();
1983        // materialize_executor.next().await.transpose().unwrap();
1984        // Second stream chunk. We check the existence of (7) -> (7,8)
1985        match materialize_executor.next().await.transpose().unwrap() {
1986            Some(Message::Barrier(_)) => {
1987                let row = table
1988                    .get_row(
1989                        &OwnedRow::new(vec![Some(1_i32.into())]),
1990                        HummockReadEpoch::NoWait(u64::MAX),
1991                    )
1992                    .await
1993                    .unwrap();
1994                assert_eq!(
1995                    row,
1996                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1997                );
1998
1999                // check update wrong value
2000                let row = table
2001                    .get_row(
2002                        &OwnedRow::new(vec![Some(2_i32.into())]),
2003                        HummockReadEpoch::NoWait(u64::MAX),
2004                    )
2005                    .await
2006                    .unwrap();
2007                assert_eq!(
2008                    row,
2009                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2010                );
2011
2012                // check update wrong pk, should become insert
2013                let row = table
2014                    .get_row(
2015                        &OwnedRow::new(vec![Some(9_i32.into())]),
2016                        HummockReadEpoch::NoWait(u64::MAX),
2017                    )
2018                    .await
2019                    .unwrap();
2020                assert_eq!(
2021                    row,
2022                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2023                );
2024            }
2025            _ => unreachable!(),
2026        }
2027    }
2028
2029    #[tokio::test]
2030    async fn test_do_update_if_not_null_conflict() {
2031        // Prepare storage and memtable.
2032        let memory_state_store = MemoryStateStore::new();
2033        let table_id = TableId::new(1);
2034        // Two columns of int32 type, the first column is PK.
2035        let schema = Schema::new(vec![
2036            Field::unnamed(DataType::Int32),
2037            Field::unnamed(DataType::Int32),
2038        ]);
2039        let column_ids = vec![0.into(), 1.into()];
2040
2041        // should get (8, 2)
2042        let chunk1 = StreamChunk::from_pretty(
2043            " i i
2044            + 1 4
2045            + 2 .
2046            + 3 6
2047            U- 8 .
2048            U+ 8 2
2049            + 8 .",
2050        );
2051
2052        // should not get (3, x), should not get (5, 0)
2053        let chunk2 = StreamChunk::from_pretty(
2054            " i i
2055            + 7 8
2056            - 3 4
2057            - 5 0",
2058        );
2059
2060        // should get (2, None), (7, 8)
2061        let chunk3 = StreamChunk::from_pretty(
2062            " i i
2063            + 1 5
2064            + 7 .
2065            U- 2 4
2066            U+ 2 .
2067            U- 9 0
2068            U+ 9 1",
2069        );
2070
2071        // Prepare stream executors.
2072        let source = MockSource::with_messages(vec![
2073            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2074            Message::Chunk(chunk1),
2075            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2076            Message::Chunk(chunk2),
2077            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2078            Message::Chunk(chunk3),
2079            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2080        ])
2081        .into_executor(schema.clone(), StreamKey::new());
2082
2083        let order_types = vec![OrderType::ascending()];
2084        let column_descs = vec![
2085            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2086            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2087        ];
2088
2089        let table = BatchTable::for_test(
2090            memory_state_store.clone(),
2091            table_id,
2092            column_descs,
2093            order_types,
2094            vec![0],
2095            vec![0, 1],
2096        );
2097
2098        let mut materialize_executor = MaterializeExecutor::for_test(
2099            source,
2100            memory_state_store,
2101            table_id,
2102            vec![ColumnOrder::new(0, OrderType::ascending())],
2103            column_ids,
2104            Arc::new(AtomicU64::new(0)),
2105            ConflictBehavior::DoUpdateIfNotNull,
2106        )
2107        .await
2108        .boxed()
2109        .execute();
2110        materialize_executor.next().await.transpose().unwrap();
2111
2112        materialize_executor.next().await.transpose().unwrap();
2113
2114        // First stream chunk. We check the existence of (3) -> (3,6)
2115        match materialize_executor.next().await.transpose().unwrap() {
2116            Some(Message::Barrier(_)) => {
2117                let row = table
2118                    .get_row(
2119                        &OwnedRow::new(vec![Some(8_i32.into())]),
2120                        HummockReadEpoch::NoWait(u64::MAX),
2121                    )
2122                    .await
2123                    .unwrap();
2124                assert_eq!(
2125                    row,
2126                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2127                );
2128
2129                let row = table
2130                    .get_row(
2131                        &OwnedRow::new(vec![Some(2_i32.into())]),
2132                        HummockReadEpoch::NoWait(u64::MAX),
2133                    )
2134                    .await
2135                    .unwrap();
2136                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2137            }
2138            _ => unreachable!(),
2139        }
2140        materialize_executor.next().await.transpose().unwrap();
2141
2142        match materialize_executor.next().await.transpose().unwrap() {
2143            Some(Message::Barrier(_)) => {
2144                let row = table
2145                    .get_row(
2146                        &OwnedRow::new(vec![Some(7_i32.into())]),
2147                        HummockReadEpoch::NoWait(u64::MAX),
2148                    )
2149                    .await
2150                    .unwrap();
2151                assert_eq!(
2152                    row,
2153                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2154                );
2155
2156                // check delete wrong value
2157                let row = table
2158                    .get_row(
2159                        &OwnedRow::new(vec![Some(3_i32.into())]),
2160                        HummockReadEpoch::NoWait(u64::MAX),
2161                    )
2162                    .await
2163                    .unwrap();
2164                assert_eq!(row, None);
2165
2166                // check delete wrong pk
2167                let row = table
2168                    .get_row(
2169                        &OwnedRow::new(vec![Some(5_i32.into())]),
2170                        HummockReadEpoch::NoWait(u64::MAX),
2171                    )
2172                    .await
2173                    .unwrap();
2174                assert_eq!(row, None);
2175            }
2176            _ => unreachable!(),
2177        }
2178
2179        materialize_executor.next().await.transpose().unwrap();
2180        // materialize_executor.next().await.transpose().unwrap();
2181        // Second stream chunk. We check the existence of (7) -> (7,8)
2182        match materialize_executor.next().await.transpose().unwrap() {
2183            Some(Message::Barrier(_)) => {
2184                let row = table
2185                    .get_row(
2186                        &OwnedRow::new(vec![Some(7_i32.into())]),
2187                        HummockReadEpoch::NoWait(u64::MAX),
2188                    )
2189                    .await
2190                    .unwrap();
2191                assert_eq!(
2192                    row,
2193                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2194                );
2195
2196                // check update wrong value
2197                let row = table
2198                    .get_row(
2199                        &OwnedRow::new(vec![Some(2_i32.into())]),
2200                        HummockReadEpoch::NoWait(u64::MAX),
2201                    )
2202                    .await
2203                    .unwrap();
2204                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2205
2206                // check update wrong pk, should become insert
2207                let row = table
2208                    .get_row(
2209                        &OwnedRow::new(vec![Some(9_i32.into())]),
2210                        HummockReadEpoch::NoWait(u64::MAX),
2211                    )
2212                    .await
2213                    .unwrap();
2214                assert_eq!(
2215                    row,
2216                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2217                );
2218            }
2219            _ => unreachable!(),
2220        }
2221    }
2222
2223    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2224        const KN: u32 = 4;
2225        const SEED: u64 = 998244353;
2226        let mut ret = vec![];
2227        let mut builder =
2228            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2229        let mut rng = SmallRng::seed_from_u64(SEED);
2230
2231        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2232            let len = c.data_chunk().capacity();
2233            let mut c = StreamChunkMut::from(c);
2234            for i in 0..len {
2235                c.set_vis(i, rng.random_bool(0.5));
2236            }
2237            c.into()
2238        };
2239        for _ in 0..row_number {
2240            let k = (rng.next_u32() % KN) as i32;
2241            let v = rng.next_u32() as i32;
2242            let op = if rng.random_bool(0.5) {
2243                Op::Insert
2244            } else {
2245                Op::Delete
2246            };
2247            if let Some(c) =
2248                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2249            {
2250                ret.push(random_vis(c, &mut rng));
2251            }
2252        }
2253        if let Some(c) = builder.take() {
2254            ret.push(random_vis(c, &mut rng));
2255        }
2256        ret
2257    }
2258
2259    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2260        const N: usize = 100000;
2261
2262        // Prepare storage and memtable.
2263        let memory_state_store = MemoryStateStore::new();
2264        let table_id = TableId::new(1);
2265        // Two columns of int32 type, the first column is PK.
2266        let schema = Schema::new(vec![
2267            Field::unnamed(DataType::Int32),
2268            Field::unnamed(DataType::Int32),
2269        ]);
2270        let column_ids = vec![0.into(), 1.into()];
2271
2272        let chunks = gen_fuzz_data(N, 128);
2273        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2274            .chain(chunks.into_iter().map(Message::Chunk))
2275            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2276                test_epoch(2),
2277            ))))
2278            .collect();
2279        // Prepare stream executors.
2280        let source =
2281            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2282
2283        let mut materialize_executor = MaterializeExecutor::for_test(
2284            source,
2285            memory_state_store.clone(),
2286            table_id,
2287            vec![ColumnOrder::new(0, OrderType::ascending())],
2288            column_ids,
2289            Arc::new(AtomicU64::new(0)),
2290            conflict_behavior,
2291        )
2292        .await
2293        .boxed()
2294        .execute();
2295        materialize_executor.expect_barrier().await;
2296
2297        let order_types = vec![OrderType::ascending()];
2298        let column_descs = vec![
2299            ColumnDesc::unnamed(0.into(), DataType::Int32),
2300            ColumnDesc::unnamed(1.into(), DataType::Int32),
2301        ];
2302        let pk_indices = vec![0];
2303
2304        let mut table = StateTable::from_table_catalog(
2305            &crate::common::table::test_utils::gen_pbtable(
2306                TableId::from(1002),
2307                column_descs.clone(),
2308                order_types,
2309                pk_indices,
2310                0,
2311            ),
2312            memory_state_store.clone(),
2313            None,
2314        )
2315        .await;
2316
2317        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2318            // check with state table's memtable
2319            table.write_chunk(c);
2320        }
2321    }
2322
2323    #[tokio::test]
2324    async fn fuzz_test_stream_consistent_upsert() {
2325        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2326    }
2327
2328    #[tokio::test]
2329    async fn fuzz_test_stream_consistent_ignore() {
2330        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2331    }
2332}