risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
39use risingwave_storage::table::KeyedRow;
40
41use crate::common::change_buffer::output_kind as cb_kind;
42use crate::common::metrics::MetricsInfo;
43use crate::common::table::state_table::{
44    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
45};
46use crate::executor::error::ErrorKind;
47use crate::executor::monitor::MaterializeMetrics;
48use crate::executor::mview::RefreshProgressTable;
49use crate::executor::mview::cache::MaterializeCache;
50use crate::executor::prelude::*;
51use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
52use crate::task::LocalBarrierManager;
53
54#[derive(Debug, Clone)]
55pub enum MaterializeStreamState<M> {
56    NormalIngestion,
57    MergingData,
58    CleanUp,
59    CommitAndYieldBarrier {
60        barrier: BarrierInner<M>,
61        expect_next_state: Box<MaterializeStreamState<M>>,
62    },
63    RefreshEnd {
64        on_complete_epoch: EpochPair,
65    },
66}
67
68/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
69pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
70    input: Executor,
71
72    schema: Schema,
73
74    state_table: StateTableInner<S, SD>,
75
76    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
77    arrange_key_indices: Vec<usize>,
78
79    actor_context: ActorContextRef,
80
81    /// The cache for conflict handling. `None` if conflict behavior is `NoCheck`.
82    materialize_cache: Option<MaterializeCache>,
83
84    conflict_behavior: ConflictBehavior,
85
86    version_column_indices: Vec<u32>,
87
88    may_have_downstream: bool,
89
90    subscriber_ids: HashSet<SubscriberId>,
91
92    metrics: MaterializeMetrics,
93
94    /// No data will be written to hummock table. This Materialize is just a dummy node.
95    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
96    is_dummy_table: bool,
97
98    /// Optional refresh arguments and state for refreshable materialized views
99    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
100
101    /// Local barrier manager for reporting barrier events
102    local_barrier_manager: LocalBarrierManager,
103}
104
105/// Arguments and state for refreshable materialized views
106pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
107    /// Table catalog for main table
108    pub table_catalog: Table,
109
110    /// Table catalog for staging table
111    pub staging_table_catalog: Table,
112
113    /// Flag indicating if this table is currently being refreshed
114    pub is_refreshing: bool,
115
116    /// During data refresh (between `RefreshStart` and `LoadFinish`),
117    /// data will be written to both the main table and the staging table.
118    ///
119    /// The staging table is PK-only.
120    ///
121    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
122    pub staging_table: StateTableInner<S, SD>,
123
124    /// Progress table for tracking refresh state per `VNode` for fault tolerance
125    pub progress_table: RefreshProgressTable<S>,
126
127    /// Table ID for this refreshable materialized view
128    pub table_id: TableId,
129}
130
131impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
132    /// Create new `RefreshableMaterializeArgs`
133    pub async fn new(
134        store: S,
135        table_catalog: &Table,
136        staging_table_catalog: &Table,
137        progress_state_table: &Table,
138        vnodes: Option<Arc<Bitmap>>,
139    ) -> Self {
140        let table_id = table_catalog.id;
141
142        // staging table is pk-only, and we don't need to check value consistency
143        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
144            staging_table_catalog,
145            store.clone(),
146            vnodes.clone(),
147        )
148        .await;
149
150        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
151            progress_state_table,
152            store,
153            vnodes,
154        )
155        .await;
156
157        // Get primary key length from main table catalog
158        let pk_len = table_catalog.pk.len();
159        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
160
161        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
162
163        Self {
164            table_catalog: table_catalog.clone(),
165            staging_table_catalog: staging_table_catalog.clone(),
166            is_refreshing: false,
167            staging_table,
168            progress_table,
169            table_id,
170        }
171    }
172}
173
174fn get_op_consistency_level(
175    conflict_behavior: ConflictBehavior,
176    may_have_downstream: bool,
177    subscriber_ids: &HashSet<SubscriberId>,
178) -> StateTableOpConsistencyLevel {
179    if !subscriber_ids.is_empty() {
180        StateTableOpConsistencyLevel::LogStoreEnabled
181    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
182        // Table with overwrite conflict behavior could disable conflict check
183        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
184        StateTableOpConsistencyLevel::Inconsistent
185    } else {
186        StateTableOpConsistencyLevel::ConsistentOldValue
187    }
188}
189
190impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
191    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
192    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
193    /// should be `None`.
194    #[allow(clippy::too_many_arguments)]
195    pub async fn new(
196        input: Executor,
197        schema: Schema,
198        store: S,
199        arrange_key: Vec<ColumnOrder>,
200        actor_context: ActorContextRef,
201        vnodes: Option<Arc<Bitmap>>,
202        table_catalog: &Table,
203        watermark_epoch: AtomicU64Ref,
204        conflict_behavior: ConflictBehavior,
205        version_column_indices: Vec<u32>,
206        metrics: Arc<StreamingMetrics>,
207        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
208        local_barrier_manager: LocalBarrierManager,
209    ) -> Self {
210        let table_columns: Vec<ColumnDesc> = table_catalog
211            .columns
212            .iter()
213            .map(|col| col.column_desc.as_ref().unwrap().into())
214            .collect();
215
216        // Extract TOAST-able column indices from table columns.
217        // Only for PostgreSQL CDC tables.
218        let toastable_column_indices = if table_catalog.cdc_table_type()
219            == risingwave_pb::catalog::table::CdcTableType::Postgres
220        {
221            let toastable_indices: Vec<usize> = table_columns
222                .iter()
223                .enumerate()
224                .filter_map(|(index, column)| match &column.data_type {
225                    // Currently supports TOAST updates for:
226                    // - jsonb (DataType::Jsonb)
227                    // - varchar (DataType::Varchar)
228                    // - bytea (DataType::Bytea)
229                    // - One-dimensional arrays of the above types (DataType::List)
230                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
231
232                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
233                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
234                        Some(index)
235                    }
236                    _ => None,
237                })
238                .collect();
239
240            if toastable_indices.is_empty() {
241                None
242            } else {
243                Some(toastable_indices)
244            }
245        } else {
246            None
247        };
248
249        let row_serde: BasicSerde = BasicSerde::new(
250            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
251            Arc::from(table_columns.into_boxed_slice()),
252        );
253
254        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
255        let may_have_downstream = actor_context.initial_dispatch_num != 0;
256        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
257        let op_consistency_level =
258            get_op_consistency_level(conflict_behavior, may_have_downstream, &subscriber_ids);
259        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
260        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
261            .with_op_consistency_level(op_consistency_level)
262            .enable_preload_all_rows_by_config(&actor_context.config)
263            .build()
264            .await;
265
266        let mv_metrics = metrics.new_materialize_metrics(
267            table_catalog.id,
268            actor_context.id,
269            actor_context.fragment_id,
270        );
271        let cache_metrics = metrics.new_materialize_cache_metrics(
272            table_catalog.id,
273            actor_context.id,
274            actor_context.fragment_id,
275        );
276
277        let metrics_info =
278            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
279
280        let is_dummy_table =
281            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
282
283        Self {
284            input,
285            schema,
286            state_table,
287            arrange_key_indices,
288            actor_context,
289            materialize_cache: MaterializeCache::new(
290                watermark_epoch,
291                metrics_info,
292                row_serde,
293                version_column_indices.clone(),
294                conflict_behavior,
295                toastable_column_indices,
296                cache_metrics,
297            ),
298            conflict_behavior,
299            version_column_indices,
300            is_dummy_table,
301            may_have_downstream,
302            subscriber_ids,
303            metrics: mv_metrics,
304            refresh_args,
305            local_barrier_manager,
306        }
307    }
308
309    #[try_stream(ok = Message, error = StreamExecutorError)]
310    async fn execute_inner(mut self) {
311        let mv_table_id = self.state_table.table_id();
312        let data_types = self.schema.data_types();
313        let mut input = self.input.execute();
314
315        let barrier = expect_first_barrier(&mut input).await?;
316        let first_epoch = barrier.epoch;
317        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
318        // The first barrier message should be propagated.
319        yield Message::Barrier(barrier);
320        self.state_table.init_epoch(first_epoch).await?;
321
322        // default to normal ingestion
323        let mut inner_state =
324            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
325        // Initialize staging table for refreshable materialized views
326        if let Some(ref mut refresh_args) = self.refresh_args {
327            refresh_args.staging_table.init_epoch(first_epoch).await?;
328
329            // Initialize progress table and load existing progress for recovery
330            refresh_args.progress_table.recover(first_epoch).await?;
331
332            // Check if refresh is already in progress (recovery scenario)
333            let progress_stats = refresh_args.progress_table.get_progress_stats();
334            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
335                refresh_args.is_refreshing = true;
336                tracing::info!(
337                    total_vnodes = progress_stats.total_vnodes,
338                    completed_vnodes = progress_stats.completed_vnodes,
339                    "Recovered refresh in progress, resuming refresh operation"
340                );
341
342                // Since stage info is no longer stored in progress table,
343                // we need to determine recovery state differently.
344                // For now, assume all incomplete VNodes need to continue merging
345                let incomplete_vnodes: Vec<_> = refresh_args
346                    .progress_table
347                    .get_all_progress()
348                    .iter()
349                    .filter(|(_, entry)| !entry.is_completed)
350                    .map(|(&vnode, _)| vnode)
351                    .collect();
352
353                if !incomplete_vnodes.is_empty() {
354                    // Some VNodes are incomplete, need to resume refresh operation
355                    tracing::info!(
356                        incomplete_vnodes = incomplete_vnodes.len(),
357                        "Recovery detected incomplete VNodes, resuming refresh operation"
358                    );
359                    // Since stage tracking is now in memory, we'll determine the appropriate
360                    // stage based on the executor's internal state machine
361                } else {
362                    // This should not happen if is_complete() returned false, but handle it gracefully
363                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
364                }
365            }
366        }
367
368        // Determine initial execution stage (for recovery scenarios)
369        if let Some(ref refresh_args) = self.refresh_args
370            && refresh_args.is_refreshing
371        {
372            // Recovery logic: Check if there are incomplete vnodes from previous run
373            let incomplete_vnodes: Vec<_> = refresh_args
374                .progress_table
375                .get_all_progress()
376                .iter()
377                .filter(|(_, entry)| !entry.is_completed)
378                .map(|(&vnode, _)| vnode)
379                .collect();
380            if !incomplete_vnodes.is_empty() {
381                // Resume from merge stage since some VNodes were left incomplete
382                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
383                tracing::info!(
384                    incomplete_vnodes = incomplete_vnodes.len(),
385                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
386                );
387            }
388        }
389
390        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
391        'main_loop: loop {
392            match *inner_state {
393                MaterializeStreamState::NormalIngestion => {
394                    #[for_await]
395                    '_normal_ingest: for msg in input.by_ref() {
396                        let msg = msg?;
397                        if let Some(cache) = &mut self.materialize_cache {
398                            cache.evict();
399                        }
400
401                        match msg {
402                            Message::Watermark(w) => {
403                                yield Message::Watermark(w);
404                            }
405                            Message::Chunk(chunk) if self.is_dummy_table => {
406                                self.metrics
407                                    .materialize_input_row_count
408                                    .inc_by(chunk.cardinality() as u64);
409                                yield Message::Chunk(chunk);
410                            }
411                            Message::Chunk(chunk) => {
412                                self.metrics
413                                    .materialize_input_row_count
414                                    .inc_by(chunk.cardinality() as u64);
415
416                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
417                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
418                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
419                                // while outputting inconsistent changes to downstream which no one will subscribe to.
420                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
421                                    self.conflict_behavior
422                                    && !self.state_table.is_consistent_op()
423                                    && self.version_column_indices.is_empty()
424                                {
425                                    ConflictBehavior::NoCheck
426                                } else {
427                                    self.conflict_behavior
428                                };
429
430                                match optimized_conflict_behavior {
431                                    checked_conflict_behaviors!() => {
432                                        if chunk.cardinality() == 0 {
433                                            // empty chunk
434                                            continue;
435                                        }
436
437                                        // For refreshable materialized views, write to staging table during refresh
438                                        // Do not use generate_output here.
439                                        if let Some(ref mut refresh_args) = self.refresh_args
440                                            && refresh_args.is_refreshing
441                                        {
442                                            let key_chunk = chunk
443                                                .clone()
444                                                .project(self.state_table.pk_indices());
445                                            tracing::trace!(
446                                                staging_chunk = %key_chunk.to_pretty(),
447                                                input_chunk = %chunk.to_pretty(),
448                                                "writing to staging table"
449                                            );
450                                            if cfg!(debug_assertions) {
451                                                // refreshable source should be append-only
452                                                assert!(
453                                                    key_chunk
454                                                        .ops()
455                                                        .iter()
456                                                        .all(|op| op == &Op::Insert)
457                                                );
458                                            }
459                                            refresh_args
460                                                .staging_table
461                                                .write_chunk(key_chunk.clone());
462                                            refresh_args.staging_table.try_flush().await?;
463                                        }
464
465                                        let cache = self.materialize_cache.as_mut().unwrap();
466                                        let change_buffer =
467                                            cache.handle_new(chunk, &self.state_table).await?;
468
469                                        match change_buffer
470                                            .into_chunk::<{ cb_kind::RETRACT }>(data_types.clone())
471                                        {
472                                            Some(output_chunk) => {
473                                                self.state_table.write_chunk(output_chunk.clone());
474                                                self.state_table.try_flush().await?;
475                                                yield Message::Chunk(output_chunk);
476                                            }
477                                            None => continue,
478                                        }
479                                    }
480                                    ConflictBehavior::NoCheck => {
481                                        self.state_table.write_chunk(chunk.clone());
482                                        self.state_table.try_flush().await?;
483
484                                        // For refreshable materialized views, also write to staging table during refresh
485                                        if let Some(ref mut refresh_args) = self.refresh_args
486                                            && refresh_args.is_refreshing
487                                        {
488                                            let key_chunk = chunk
489                                                .clone()
490                                                .project(self.state_table.pk_indices());
491                                            tracing::trace!(
492                                                staging_chunk = %key_chunk.to_pretty(),
493                                                input_chunk = %chunk.to_pretty(),
494                                                "writing to staging table"
495                                            );
496                                            if cfg!(debug_assertions) {
497                                                // refreshable source should be append-only
498                                                assert!(
499                                                    key_chunk
500                                                        .ops()
501                                                        .iter()
502                                                        .all(|op| op == &Op::Insert)
503                                                );
504                                            }
505                                            refresh_args
506                                                .staging_table
507                                                .write_chunk(key_chunk.clone());
508                                            refresh_args.staging_table.try_flush().await?;
509                                        }
510
511                                        yield Message::Chunk(chunk);
512                                    }
513                                }
514                            }
515                            Message::Barrier(barrier) => {
516                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
517                                    barrier,
518                                    expect_next_state: Box::new(
519                                        MaterializeStreamState::NormalIngestion,
520                                    ),
521                                };
522                                continue 'main_loop;
523                            }
524                        }
525                    }
526
527                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
528                        anyhow::anyhow!(
529                            "Input stream terminated unexpectedly during normal ingestion"
530                        ),
531                    )));
532                }
533                MaterializeStreamState::MergingData => {
534                    let Some(refresh_args) = self.refresh_args.as_mut() else {
535                        panic!(
536                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
537                        );
538                    };
539                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
540
541                    debug_assert_eq!(
542                        self.state_table.vnodes(),
543                        refresh_args.staging_table.vnodes()
544                    );
545                    debug_assert_eq!(
546                        refresh_args.staging_table.vnodes(),
547                        refresh_args.progress_table.vnodes()
548                    );
549
550                    let mut rows_to_delete = vec![];
551                    let mut merge_complete = false;
552                    let mut pending_barrier: Option<Barrier> = None;
553
554                    // Scope to limit immutable borrows to state tables
555                    {
556                        let left_input = input.by_ref().map(Either::Left);
557                        let right_merge_sort = pin!(
558                            Self::make_mergesort_stream(
559                                &self.state_table,
560                                &refresh_args.staging_table,
561                                &mut refresh_args.progress_table
562                            )
563                            .map(Either::Right)
564                        );
565
566                        // Prefer to select input stream to handle barriers promptly
567                        // Rebuild the merge stream each time processing a barrier
568                        let mut merge_stream =
569                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
570                                stream::PollNext::Left
571                            });
572
573                        #[for_await]
574                        'merge_stream: for either in &mut merge_stream {
575                            match either {
576                                Either::Left(msg) => {
577                                    let msg = msg?;
578                                    match msg {
579                                        Message::Watermark(w) => yield Message::Watermark(w),
580                                        Message::Chunk(chunk) => {
581                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
582                                        }
583                                        Message::Barrier(b) => {
584                                            pending_barrier = Some(b);
585                                            break 'merge_stream;
586                                        }
587                                    }
588                                }
589                                Either::Right(result) => {
590                                    match result? {
591                                        Some((_vnode, row)) => {
592                                            rows_to_delete.push(row);
593                                        }
594                                        None => {
595                                            // Merge stream finished
596                                            merge_complete = true;
597
598                                            // If the merge stream finished, we need to wait for the next barrier to commit states
599                                        }
600                                    }
601                                }
602                            }
603                        }
604                    }
605
606                    // Process collected rows for deletion
607                    for row in &rows_to_delete {
608                        self.state_table.delete(row);
609                    }
610                    if !rows_to_delete.is_empty() {
611                        let to_delete_chunk = StreamChunk::from_rows(
612                            &rows_to_delete
613                                .iter()
614                                .map(|row| (Op::Delete, row))
615                                .collect_vec(),
616                            &self.schema.data_types(),
617                        );
618
619                        yield Message::Chunk(to_delete_chunk);
620                    }
621
622                    // should wait for at least one barrier
623                    assert!(pending_barrier.is_some(), "pending barrier is not set");
624
625                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
626                        barrier: pending_barrier.unwrap(),
627                        expect_next_state: if merge_complete {
628                            Box::new(MaterializeStreamState::CleanUp)
629                        } else {
630                            Box::new(MaterializeStreamState::MergingData)
631                        },
632                    };
633                    continue 'main_loop;
634                }
635                MaterializeStreamState::CleanUp => {
636                    let Some(refresh_args) = self.refresh_args.as_mut() else {
637                        panic!(
638                            "MaterializeExecutor entered MergingData state without refresh_args configured"
639                        );
640                    };
641                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
642
643                    #[for_await]
644                    for msg in input.by_ref() {
645                        let msg = msg?;
646                        match msg {
647                            Message::Watermark(w) => yield Message::Watermark(w),
648                            Message::Chunk(chunk) => {
649                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
650                            }
651                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
652                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
653                                    barrier,
654                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
655                                };
656                                continue 'main_loop;
657                            }
658                            Message::Barrier(barrier) => {
659                                let staging_table_id = refresh_args.staging_table.table_id();
660                                let epoch = barrier.epoch;
661                                self.local_barrier_manager.report_refresh_finished(
662                                    epoch,
663                                    self.actor_context.id,
664                                    refresh_args.table_id,
665                                    staging_table_id,
666                                );
667                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
668
669                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
670                                    barrier,
671                                    expect_next_state: Box::new(
672                                        MaterializeStreamState::RefreshEnd {
673                                            on_complete_epoch: epoch,
674                                        },
675                                    ),
676                                };
677                                continue 'main_loop;
678                            }
679                        }
680                    }
681                }
682                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
683                    let Some(refresh_args) = self.refresh_args.as_mut() else {
684                        panic!(
685                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
686                        );
687                    };
688                    let staging_table_id = refresh_args.staging_table.table_id();
689
690                    // Wait for staging table truncation to complete
691                    let staging_store = refresh_args.staging_table.state_store().clone();
692                    staging_store
693                        .try_wait_epoch(
694                            HummockReadEpoch::Committed(on_complete_epoch.prev),
695                            TryWaitEpochOptions {
696                                table_id: staging_table_id,
697                            },
698                        )
699                        .await?;
700
701                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
702
703                    if let Some(ref mut refresh_args) = self.refresh_args {
704                        refresh_args.is_refreshing = false;
705                    }
706                    *inner_state = MaterializeStreamState::NormalIngestion;
707                    continue 'main_loop;
708                }
709                MaterializeStreamState::CommitAndYieldBarrier {
710                    barrier,
711                    mut expect_next_state,
712                } => {
713                    if let Some(ref mut refresh_args) = self.refresh_args {
714                        match barrier.mutation.as_deref() {
715                            Some(Mutation::RefreshStart {
716                                table_id: refresh_table_id,
717                                associated_source_id: _,
718                            }) if *refresh_table_id == refresh_args.table_id => {
719                                debug_assert!(
720                                    !refresh_args.is_refreshing,
721                                    "cannot start refresh twice"
722                                );
723                                refresh_args.is_refreshing = true;
724                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
725
726                                // Initialize progress tracking for all VNodes
727                                Self::init_refresh_progress(
728                                    &self.state_table,
729                                    &mut refresh_args.progress_table,
730                                    barrier.epoch.curr,
731                                )?;
732                            }
733                            Some(Mutation::LoadFinish {
734                                associated_source_id: load_finish_source_id,
735                            }) => {
736                                // Get associated source id from table catalog
737                                let associated_source_id: SourceId = match refresh_args
738                                    .table_catalog
739                                    .optional_associated_source_id
740                                {
741                                    Some(id) => id.into(),
742                                    None => unreachable!("associated_source_id is not set"),
743                                };
744
745                                if *load_finish_source_id == associated_source_id {
746                                    tracing::info!(
747                                        %load_finish_source_id,
748                                        "LoadFinish received, starting data replacement"
749                                    );
750                                    expect_next_state =
751                                        Box::new(MaterializeStreamState::<_>::MergingData);
752                                }
753                            }
754                            _ => {}
755                        }
756                    }
757
758                    // ===== normal operation =====
759
760                    // If a downstream mv depends on the current table, we need to do conflict check again.
761                    if !self.may_have_downstream
762                        && barrier.has_more_downstream_fragments(self.actor_context.id)
763                    {
764                        self.may_have_downstream = true;
765                    }
766                    Self::may_update_depended_subscriptions(
767                        &mut self.subscriber_ids,
768                        &barrier,
769                        mv_table_id,
770                    );
771                    let op_consistency_level = get_op_consistency_level(
772                        self.conflict_behavior,
773                        self.may_have_downstream,
774                        &self.subscriber_ids,
775                    );
776                    let post_commit = self
777                        .state_table
778                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
779                        .await?;
780                    if !post_commit.inner().is_consistent_op() {
781                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
782                    }
783
784                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
785
786                    // Commit staging table for refreshable materialized views
787                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
788                    {
789                        // Commit progress table for fault tolerance
790
791                        Some((
792                            refresh_args.staging_table.commit(barrier.epoch).await?,
793                            refresh_args.progress_table.commit(barrier.epoch).await?,
794                        ))
795                    } else {
796                        None
797                    };
798
799                    let b_epoch = barrier.epoch;
800                    yield Message::Barrier(barrier);
801
802                    // Update the vnode bitmap for the state table if asked.
803                    if let Some((_, cache_may_stale)) = post_commit
804                        .post_yield_barrier(update_vnode_bitmap.clone())
805                        .await?
806                        && cache_may_stale
807                        && let Some(cache) = &mut self.materialize_cache
808                    {
809                        cache.clear();
810                    }
811
812                    // Handle staging table post commit
813                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
814                        staging_post_commit
815                            .post_yield_barrier(update_vnode_bitmap.clone())
816                            .await?;
817                        progress_post_commit
818                            .post_yield_barrier(update_vnode_bitmap)
819                            .await?;
820                    }
821
822                    self.metrics
823                        .materialize_current_epoch
824                        .set(b_epoch.curr as i64);
825
826                    // ====== transition to next state ======
827
828                    *inner_state = *expect_next_state;
829                }
830            }
831        }
832    }
833
834    /// Stream that yields rows to be deleted from main table.
835    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
836    /// Yields `None` when finished processing all vnodes.
837    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
838    async fn make_mergesort_stream<'a>(
839        main_table: &'a StateTableInner<S, SD>,
840        staging_table: &'a StateTableInner<S, SD>,
841        progress_table: &'a mut RefreshProgressTable<S>,
842    ) {
843        for vnode in main_table.vnodes().clone().iter_vnodes() {
844            let mut processed_rows = 0;
845            // Check if this VNode has already been completed (for fault tolerance)
846            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
847                if let Some(current_entry) = progress_table.get_progress(vnode) {
848                    // Skip already completed VNodes during recovery
849                    if current_entry.is_completed {
850                        tracing::debug!(
851                            vnode = vnode.to_index(),
852                            "Skipping already completed VNode during recovery"
853                        );
854                        continue;
855                    }
856                    processed_rows += current_entry.processed_rows;
857                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
858
859                    if let Some(current_state) = &current_entry.current_pos {
860                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
861                    } else {
862                        (Bound::Unbounded, Bound::Unbounded)
863                    }
864                } else {
865                    (Bound::Unbounded, Bound::Unbounded)
866                };
867
868            let iter_main = main_table
869                .iter_keyed_row_with_vnode(
870                    vnode,
871                    &pk_range,
872                    PrefetchOptions::prefetch_for_large_range_scan(),
873                )
874                .await?;
875            let iter_staging = staging_table
876                .iter_keyed_row_with_vnode(
877                    vnode,
878                    &pk_range,
879                    PrefetchOptions::prefetch_for_large_range_scan(),
880                )
881                .await?;
882
883            pin_mut!(iter_main);
884            pin_mut!(iter_staging);
885
886            // Sort-merge join implementation using dual pointers
887            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
888            let mut staging_item: Option<KeyedRow<Bytes>> =
889                iter_staging.next().await.transpose()?;
890
891            while let Some(main_kv) = main_item {
892                let main_key = main_kv.key();
893
894                // Advance staging iterator until we find a key >= main_key
895                let mut should_delete = false;
896                while let Some(staging_kv) = &staging_item {
897                    let staging_key = staging_kv.key();
898                    match main_key.cmp(staging_key) {
899                        std::cmp::Ordering::Greater => {
900                            // main_key > staging_key, advance staging
901                            staging_item = iter_staging.next().await.transpose()?;
902                        }
903                        std::cmp::Ordering::Equal => {
904                            // Keys match, this row exists in both tables, no need to delete
905                            break;
906                        }
907                        std::cmp::Ordering::Less => {
908                            // main_key < staging_key, main row doesn't exist in staging, delete it
909                            should_delete = true;
910                            break;
911                        }
912                    }
913                }
914
915                // If staging_item is None, all remaining main rows should be deleted
916                if staging_item.is_none() {
917                    should_delete = true;
918                }
919
920                if should_delete {
921                    yield Some((vnode, main_kv.row().clone()));
922                }
923
924                // Advance main iterator
925                processed_rows += 1;
926                tracing::debug!(
927                    "set progress table: vnode = {:?}, processed_rows = {:?}",
928                    vnode,
929                    processed_rows
930                );
931                progress_table.set_progress(
932                    vnode,
933                    Some(
934                        main_kv
935                            .row()
936                            .project(main_table.pk_indices())
937                            .to_owned_row(),
938                    ),
939                    false,
940                    processed_rows,
941                )?;
942                main_item = iter_main.next().await.transpose()?;
943            }
944
945            // Mark this VNode as completed
946            if let Some(current_entry) = progress_table.get_progress(vnode) {
947                progress_table.set_progress(
948                    vnode,
949                    current_entry.current_pos.clone(),
950                    true, // completed
951                    current_entry.processed_rows,
952                )?;
953
954                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
955            }
956        }
957
958        // Signal completion
959        yield None;
960    }
961
962    /// return true when changed
963    fn may_update_depended_subscriptions(
964        depended_subscriptions: &mut HashSet<SubscriberId>,
965        barrier: &Barrier,
966        mv_table_id: TableId,
967    ) {
968        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
969            if !depended_subscriptions.insert(subscriber_id) {
970                warn!(
971                    ?depended_subscriptions,
972                    %mv_table_id,
973                    %subscriber_id,
974                    "subscription id already exists"
975                );
976            }
977        }
978
979        if let Some(Mutation::DropSubscriptions {
980            subscriptions_to_drop,
981        }) = barrier.mutation.as_deref()
982        {
983            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
984                if *upstream_mv_table_id == mv_table_id
985                    && !depended_subscriptions.remove(subscriber_id)
986                {
987                    warn!(
988                        ?depended_subscriptions,
989                        %mv_table_id,
990                        %subscriber_id,
991                        "drop non existing subscriber_id id"
992                    );
993                }
994            }
995        }
996    }
997
998    /// Initialize refresh progress tracking for all `VNodes`
999    fn init_refresh_progress(
1000        state_table: &StateTableInner<S, SD>,
1001        progress_table: &mut RefreshProgressTable<S>,
1002        _epoch: u64,
1003    ) -> StreamExecutorResult<()> {
1004        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1005
1006        // Initialize progress for all VNodes in the current bitmap
1007        for vnode in state_table.vnodes().iter_vnodes() {
1008            progress_table.set_progress(
1009                vnode, None,  // initial position
1010                false, // not completed yet
1011                0,     // initial processed rows
1012            )?;
1013        }
1014
1015        tracing::info!(
1016            vnodes_count = state_table.vnodes().count_ones(),
1017            "Initialized refresh progress tracking for all VNodes"
1018        );
1019
1020        Ok(())
1021    }
1022}
1023
1024impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1025    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1026    #[cfg(any(test, feature = "test"))]
1027    pub async fn for_test(
1028        input: Executor,
1029        store: S,
1030        table_id: TableId,
1031        keys: Vec<ColumnOrder>,
1032        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1033        watermark_epoch: AtomicU64Ref,
1034        conflict_behavior: ConflictBehavior,
1035    ) -> Self {
1036        use risingwave_common::util::iter_util::ZipEqFast;
1037
1038        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1039        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1040        let schema = input.schema().clone();
1041        let columns: Vec<ColumnDesc> = column_ids
1042            .into_iter()
1043            .zip_eq_fast(schema.fields.iter())
1044            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1045            .collect_vec();
1046
1047        let row_serde = BasicSerde::new(
1048            Arc::from((0..columns.len()).collect_vec()),
1049            Arc::from(columns.clone().into_boxed_slice()),
1050        );
1051        let state_table = StateTableInner::from_table_catalog(
1052            &crate::common::table::test_utils::gen_pbtable(
1053                table_id,
1054                columns,
1055                arrange_order_types,
1056                arrange_columns.clone(),
1057                0,
1058            ),
1059            store,
1060            None,
1061        )
1062        .await;
1063
1064        let unused = StreamingMetrics::unused();
1065        let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1066        let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1067
1068        Self {
1069            input,
1070            schema,
1071            state_table,
1072            arrange_key_indices: arrange_columns.clone(),
1073            actor_context: ActorContext::for_test(0),
1074            materialize_cache: MaterializeCache::new(
1075                watermark_epoch,
1076                MetricsInfo::for_test(),
1077                row_serde,
1078                vec![],
1079                conflict_behavior,
1080                None,
1081                cache_metrics,
1082            ),
1083            conflict_behavior,
1084            version_column_indices: vec![],
1085            is_dummy_table: false,
1086            may_have_downstream: true,
1087            subscriber_ids: HashSet::new(),
1088            metrics,
1089            refresh_args: None, // Test constructor doesn't support refresh functionality
1090            local_barrier_manager: LocalBarrierManager::for_test(),
1091        }
1092    }
1093}
1094
1095impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1096    fn execute(self: Box<Self>) -> BoxedMessageStream {
1097        self.execute_inner().boxed()
1098    }
1099}
1100
1101impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103        f.debug_struct("MaterializeExecutor")
1104            .field("arrange_key_indices", &self.arrange_key_indices)
1105            .finish()
1106    }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111
1112    use std::iter;
1113    use std::sync::atomic::AtomicU64;
1114
1115    use rand::rngs::SmallRng;
1116    use rand::{Rng, RngCore, SeedableRng};
1117    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1118    use risingwave_common::catalog::Field;
1119    use risingwave_common::util::epoch::test_epoch;
1120    use risingwave_common::util::sort_util::OrderType;
1121    use risingwave_hummock_sdk::HummockReadEpoch;
1122    use risingwave_storage::memory::MemoryStateStore;
1123    use risingwave_storage::table::batch_table::BatchTable;
1124
1125    use super::*;
1126    use crate::executor::test_utils::*;
1127
1128    #[tokio::test]
1129    async fn test_materialize_executor() {
1130        // Prepare storage and memtable.
1131        let memory_state_store = MemoryStateStore::new();
1132        let table_id = TableId::new(1);
1133        // Two columns of int32 type, the first column is PK.
1134        let schema = Schema::new(vec![
1135            Field::unnamed(DataType::Int32),
1136            Field::unnamed(DataType::Int32),
1137        ]);
1138        let column_ids = vec![0.into(), 1.into()];
1139
1140        // Prepare source chunks.
1141        let chunk1 = StreamChunk::from_pretty(
1142            " i i
1143            + 1 4
1144            + 2 5
1145            + 3 6",
1146        );
1147        let chunk2 = StreamChunk::from_pretty(
1148            " i i
1149            + 7 8
1150            - 3 6",
1151        );
1152
1153        // Prepare stream executors.
1154        let source = MockSource::with_messages(vec![
1155            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1156            Message::Chunk(chunk1),
1157            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1158            Message::Chunk(chunk2),
1159            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1160        ])
1161        .into_executor(schema.clone(), StreamKey::new());
1162
1163        let order_types = vec![OrderType::ascending()];
1164        let column_descs = vec![
1165            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1166            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1167        ];
1168
1169        let table = BatchTable::for_test(
1170            memory_state_store.clone(),
1171            table_id,
1172            column_descs,
1173            order_types,
1174            vec![0],
1175            vec![0, 1],
1176        );
1177
1178        let mut materialize_executor = MaterializeExecutor::for_test(
1179            source,
1180            memory_state_store,
1181            table_id,
1182            vec![ColumnOrder::new(0, OrderType::ascending())],
1183            column_ids,
1184            Arc::new(AtomicU64::new(0)),
1185            ConflictBehavior::NoCheck,
1186        )
1187        .await
1188        .boxed()
1189        .execute();
1190        materialize_executor.next().await.transpose().unwrap();
1191
1192        materialize_executor.next().await.transpose().unwrap();
1193
1194        // First stream chunk. We check the existence of (3) -> (3,6)
1195        match materialize_executor.next().await.transpose().unwrap() {
1196            Some(Message::Barrier(_)) => {
1197                let row = table
1198                    .get_row(
1199                        &OwnedRow::new(vec![Some(3_i32.into())]),
1200                        HummockReadEpoch::NoWait(u64::MAX),
1201                    )
1202                    .await
1203                    .unwrap();
1204                assert_eq!(
1205                    row,
1206                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1207                );
1208            }
1209            _ => unreachable!(),
1210        }
1211        materialize_executor.next().await.transpose().unwrap();
1212        // Second stream chunk. We check the existence of (7) -> (7,8)
1213        match materialize_executor.next().await.transpose().unwrap() {
1214            Some(Message::Barrier(_)) => {
1215                let row = table
1216                    .get_row(
1217                        &OwnedRow::new(vec![Some(7_i32.into())]),
1218                        HummockReadEpoch::NoWait(u64::MAX),
1219                    )
1220                    .await
1221                    .unwrap();
1222                assert_eq!(
1223                    row,
1224                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1225                );
1226            }
1227            _ => unreachable!(),
1228        }
1229    }
1230
1231    // https://github.com/risingwavelabs/risingwave/issues/13346
1232    #[tokio::test]
1233    async fn test_upsert_stream() {
1234        // Prepare storage and memtable.
1235        let memory_state_store = MemoryStateStore::new();
1236        let table_id = TableId::new(1);
1237        // Two columns of int32 type, the first column is PK.
1238        let schema = Schema::new(vec![
1239            Field::unnamed(DataType::Int32),
1240            Field::unnamed(DataType::Int32),
1241        ]);
1242        let column_ids = vec![0.into(), 1.into()];
1243
1244        // test double insert one pk, the latter needs to override the former.
1245        let chunk1 = StreamChunk::from_pretty(
1246            " i i
1247            + 1 1",
1248        );
1249
1250        let chunk2 = StreamChunk::from_pretty(
1251            " i i
1252            + 1 2
1253            - 1 2",
1254        );
1255
1256        // Prepare stream executors.
1257        let source = MockSource::with_messages(vec![
1258            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1259            Message::Chunk(chunk1),
1260            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1261            Message::Chunk(chunk2),
1262            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1263        ])
1264        .into_executor(schema.clone(), StreamKey::new());
1265
1266        let order_types = vec![OrderType::ascending()];
1267        let column_descs = vec![
1268            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1269            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1270        ];
1271
1272        let table = BatchTable::for_test(
1273            memory_state_store.clone(),
1274            table_id,
1275            column_descs,
1276            order_types,
1277            vec![0],
1278            vec![0, 1],
1279        );
1280
1281        let mut materialize_executor = MaterializeExecutor::for_test(
1282            source,
1283            memory_state_store,
1284            table_id,
1285            vec![ColumnOrder::new(0, OrderType::ascending())],
1286            column_ids,
1287            Arc::new(AtomicU64::new(0)),
1288            ConflictBehavior::Overwrite,
1289        )
1290        .await
1291        .boxed()
1292        .execute();
1293        materialize_executor.next().await.transpose().unwrap();
1294
1295        materialize_executor.next().await.transpose().unwrap();
1296        materialize_executor.next().await.transpose().unwrap();
1297        materialize_executor.next().await.transpose().unwrap();
1298
1299        match materialize_executor.next().await.transpose().unwrap() {
1300            Some(Message::Barrier(_)) => {
1301                let row = table
1302                    .get_row(
1303                        &OwnedRow::new(vec![Some(1_i32.into())]),
1304                        HummockReadEpoch::NoWait(u64::MAX),
1305                    )
1306                    .await
1307                    .unwrap();
1308                assert!(row.is_none());
1309            }
1310            _ => unreachable!(),
1311        }
1312    }
1313
1314    #[tokio::test]
1315    async fn test_check_insert_conflict() {
1316        // Prepare storage and memtable.
1317        let memory_state_store = MemoryStateStore::new();
1318        let table_id = TableId::new(1);
1319        // Two columns of int32 type, the first column is PK.
1320        let schema = Schema::new(vec![
1321            Field::unnamed(DataType::Int32),
1322            Field::unnamed(DataType::Int32),
1323        ]);
1324        let column_ids = vec![0.into(), 1.into()];
1325
1326        // test double insert one pk, the latter needs to override the former.
1327        let chunk1 = StreamChunk::from_pretty(
1328            " i i
1329            + 1 3
1330            + 1 4
1331            + 2 5
1332            + 3 6",
1333        );
1334
1335        let chunk2 = StreamChunk::from_pretty(
1336            " i i
1337            + 1 3
1338            + 2 6",
1339        );
1340
1341        // test delete wrong value, delete inexistent pk
1342        let chunk3 = StreamChunk::from_pretty(
1343            " i i
1344            + 1 4",
1345        );
1346
1347        // Prepare stream executors.
1348        let source = MockSource::with_messages(vec![
1349            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1350            Message::Chunk(chunk1),
1351            Message::Chunk(chunk2),
1352            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1353            Message::Chunk(chunk3),
1354            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1355        ])
1356        .into_executor(schema.clone(), StreamKey::new());
1357
1358        let order_types = vec![OrderType::ascending()];
1359        let column_descs = vec![
1360            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1361            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1362        ];
1363
1364        let table = BatchTable::for_test(
1365            memory_state_store.clone(),
1366            table_id,
1367            column_descs,
1368            order_types,
1369            vec![0],
1370            vec![0, 1],
1371        );
1372
1373        let mut materialize_executor = MaterializeExecutor::for_test(
1374            source,
1375            memory_state_store,
1376            table_id,
1377            vec![ColumnOrder::new(0, OrderType::ascending())],
1378            column_ids,
1379            Arc::new(AtomicU64::new(0)),
1380            ConflictBehavior::Overwrite,
1381        )
1382        .await
1383        .boxed()
1384        .execute();
1385        materialize_executor.next().await.transpose().unwrap();
1386
1387        materialize_executor.next().await.transpose().unwrap();
1388        materialize_executor.next().await.transpose().unwrap();
1389
1390        // First stream chunk. We check the existence of (3) -> (3,6)
1391        match materialize_executor.next().await.transpose().unwrap() {
1392            Some(Message::Barrier(_)) => {
1393                let row = table
1394                    .get_row(
1395                        &OwnedRow::new(vec![Some(3_i32.into())]),
1396                        HummockReadEpoch::NoWait(u64::MAX),
1397                    )
1398                    .await
1399                    .unwrap();
1400                assert_eq!(
1401                    row,
1402                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1403                );
1404
1405                let row = table
1406                    .get_row(
1407                        &OwnedRow::new(vec![Some(1_i32.into())]),
1408                        HummockReadEpoch::NoWait(u64::MAX),
1409                    )
1410                    .await
1411                    .unwrap();
1412                assert_eq!(
1413                    row,
1414                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1415                );
1416
1417                let row = table
1418                    .get_row(
1419                        &OwnedRow::new(vec![Some(2_i32.into())]),
1420                        HummockReadEpoch::NoWait(u64::MAX),
1421                    )
1422                    .await
1423                    .unwrap();
1424                assert_eq!(
1425                    row,
1426                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1427                );
1428            }
1429            _ => unreachable!(),
1430        }
1431    }
1432
1433    #[tokio::test]
1434    async fn test_delete_and_update_conflict() {
1435        // Prepare storage and memtable.
1436        let memory_state_store = MemoryStateStore::new();
1437        let table_id = TableId::new(1);
1438        // Two columns of int32 type, the first column is PK.
1439        let schema = Schema::new(vec![
1440            Field::unnamed(DataType::Int32),
1441            Field::unnamed(DataType::Int32),
1442        ]);
1443        let column_ids = vec![0.into(), 1.into()];
1444
1445        // test double insert one pk, the latter needs to override the former.
1446        let chunk1 = StreamChunk::from_pretty(
1447            " i i
1448            + 1 4
1449            + 2 5
1450            + 3 6
1451            U- 8 1
1452            U+ 8 2
1453            + 8 3",
1454        );
1455
1456        // test delete wrong value, delete inexistent pk
1457        let chunk2 = StreamChunk::from_pretty(
1458            " i i
1459            + 7 8
1460            - 3 4
1461            - 5 0",
1462        );
1463
1464        // test delete wrong value, delete inexistent pk
1465        let chunk3 = StreamChunk::from_pretty(
1466            " i i
1467            + 1 5
1468            U- 2 4
1469            U+ 2 8
1470            U- 9 0
1471            U+ 9 1",
1472        );
1473
1474        // Prepare stream executors.
1475        let source = MockSource::with_messages(vec![
1476            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1477            Message::Chunk(chunk1),
1478            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1479            Message::Chunk(chunk2),
1480            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1481            Message::Chunk(chunk3),
1482            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1483        ])
1484        .into_executor(schema.clone(), StreamKey::new());
1485
1486        let order_types = vec![OrderType::ascending()];
1487        let column_descs = vec![
1488            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1489            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1490        ];
1491
1492        let table = BatchTable::for_test(
1493            memory_state_store.clone(),
1494            table_id,
1495            column_descs,
1496            order_types,
1497            vec![0],
1498            vec![0, 1],
1499        );
1500
1501        let mut materialize_executor = MaterializeExecutor::for_test(
1502            source,
1503            memory_state_store,
1504            table_id,
1505            vec![ColumnOrder::new(0, OrderType::ascending())],
1506            column_ids,
1507            Arc::new(AtomicU64::new(0)),
1508            ConflictBehavior::Overwrite,
1509        )
1510        .await
1511        .boxed()
1512        .execute();
1513        materialize_executor.next().await.transpose().unwrap();
1514
1515        materialize_executor.next().await.transpose().unwrap();
1516
1517        // First stream chunk. We check the existence of (3) -> (3,6)
1518        match materialize_executor.next().await.transpose().unwrap() {
1519            Some(Message::Barrier(_)) => {
1520                // can read (8, 3), check insert after update
1521                let row = table
1522                    .get_row(
1523                        &OwnedRow::new(vec![Some(8_i32.into())]),
1524                        HummockReadEpoch::NoWait(u64::MAX),
1525                    )
1526                    .await
1527                    .unwrap();
1528                assert_eq!(
1529                    row,
1530                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1531                );
1532            }
1533            _ => unreachable!(),
1534        }
1535        materialize_executor.next().await.transpose().unwrap();
1536
1537        match materialize_executor.next().await.transpose().unwrap() {
1538            Some(Message::Barrier(_)) => {
1539                let row = table
1540                    .get_row(
1541                        &OwnedRow::new(vec![Some(7_i32.into())]),
1542                        HummockReadEpoch::NoWait(u64::MAX),
1543                    )
1544                    .await
1545                    .unwrap();
1546                assert_eq!(
1547                    row,
1548                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1549                );
1550
1551                // check delete wrong value
1552                let row = table
1553                    .get_row(
1554                        &OwnedRow::new(vec![Some(3_i32.into())]),
1555                        HummockReadEpoch::NoWait(u64::MAX),
1556                    )
1557                    .await
1558                    .unwrap();
1559                assert_eq!(row, None);
1560
1561                // check delete wrong pk
1562                let row = table
1563                    .get_row(
1564                        &OwnedRow::new(vec![Some(5_i32.into())]),
1565                        HummockReadEpoch::NoWait(u64::MAX),
1566                    )
1567                    .await
1568                    .unwrap();
1569                assert_eq!(row, None);
1570            }
1571            _ => unreachable!(),
1572        }
1573
1574        materialize_executor.next().await.transpose().unwrap();
1575        // Second stream chunk. We check the existence of (7) -> (7,8)
1576        match materialize_executor.next().await.transpose().unwrap() {
1577            Some(Message::Barrier(_)) => {
1578                let row = table
1579                    .get_row(
1580                        &OwnedRow::new(vec![Some(1_i32.into())]),
1581                        HummockReadEpoch::NoWait(u64::MAX),
1582                    )
1583                    .await
1584                    .unwrap();
1585                assert_eq!(
1586                    row,
1587                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1588                );
1589
1590                // check update wrong value
1591                let row = table
1592                    .get_row(
1593                        &OwnedRow::new(vec![Some(2_i32.into())]),
1594                        HummockReadEpoch::NoWait(u64::MAX),
1595                    )
1596                    .await
1597                    .unwrap();
1598                assert_eq!(
1599                    row,
1600                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1601                );
1602
1603                // check update wrong pk, should become insert
1604                let row = table
1605                    .get_row(
1606                        &OwnedRow::new(vec![Some(9_i32.into())]),
1607                        HummockReadEpoch::NoWait(u64::MAX),
1608                    )
1609                    .await
1610                    .unwrap();
1611                assert_eq!(
1612                    row,
1613                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1614                );
1615            }
1616            _ => unreachable!(),
1617        }
1618    }
1619
1620    #[tokio::test]
1621    async fn test_ignore_insert_conflict() {
1622        // Prepare storage and memtable.
1623        let memory_state_store = MemoryStateStore::new();
1624        let table_id = TableId::new(1);
1625        // Two columns of int32 type, the first column is PK.
1626        let schema = Schema::new(vec![
1627            Field::unnamed(DataType::Int32),
1628            Field::unnamed(DataType::Int32),
1629        ]);
1630        let column_ids = vec![0.into(), 1.into()];
1631
1632        // test double insert one pk, the latter needs to be ignored.
1633        let chunk1 = StreamChunk::from_pretty(
1634            " i i
1635            + 1 3
1636            + 1 4
1637            + 2 5
1638            + 3 6",
1639        );
1640
1641        let chunk2 = StreamChunk::from_pretty(
1642            " i i
1643            + 1 5
1644            + 2 6",
1645        );
1646
1647        // test delete wrong value, delete inexistent pk
1648        let chunk3 = StreamChunk::from_pretty(
1649            " i i
1650            + 1 6",
1651        );
1652
1653        // Prepare stream executors.
1654        let source = MockSource::with_messages(vec![
1655            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1656            Message::Chunk(chunk1),
1657            Message::Chunk(chunk2),
1658            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1659            Message::Chunk(chunk3),
1660            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1661        ])
1662        .into_executor(schema.clone(), StreamKey::new());
1663
1664        let order_types = vec![OrderType::ascending()];
1665        let column_descs = vec![
1666            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1667            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1668        ];
1669
1670        let table = BatchTable::for_test(
1671            memory_state_store.clone(),
1672            table_id,
1673            column_descs,
1674            order_types,
1675            vec![0],
1676            vec![0, 1],
1677        );
1678
1679        let mut materialize_executor = MaterializeExecutor::for_test(
1680            source,
1681            memory_state_store,
1682            table_id,
1683            vec![ColumnOrder::new(0, OrderType::ascending())],
1684            column_ids,
1685            Arc::new(AtomicU64::new(0)),
1686            ConflictBehavior::IgnoreConflict,
1687        )
1688        .await
1689        .boxed()
1690        .execute();
1691        materialize_executor.next().await.transpose().unwrap();
1692
1693        materialize_executor.next().await.transpose().unwrap();
1694        materialize_executor.next().await.transpose().unwrap();
1695
1696        // First stream chunk. We check the existence of (3) -> (3,6)
1697        match materialize_executor.next().await.transpose().unwrap() {
1698            Some(Message::Barrier(_)) => {
1699                let row = table
1700                    .get_row(
1701                        &OwnedRow::new(vec![Some(3_i32.into())]),
1702                        HummockReadEpoch::NoWait(u64::MAX),
1703                    )
1704                    .await
1705                    .unwrap();
1706                assert_eq!(
1707                    row,
1708                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1709                );
1710
1711                let row = table
1712                    .get_row(
1713                        &OwnedRow::new(vec![Some(1_i32.into())]),
1714                        HummockReadEpoch::NoWait(u64::MAX),
1715                    )
1716                    .await
1717                    .unwrap();
1718                assert_eq!(
1719                    row,
1720                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1721                );
1722
1723                let row = table
1724                    .get_row(
1725                        &OwnedRow::new(vec![Some(2_i32.into())]),
1726                        HummockReadEpoch::NoWait(u64::MAX),
1727                    )
1728                    .await
1729                    .unwrap();
1730                assert_eq!(
1731                    row,
1732                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1733                );
1734            }
1735            _ => unreachable!(),
1736        }
1737    }
1738
1739    #[tokio::test]
1740    async fn test_ignore_delete_then_insert() {
1741        // Prepare storage and memtable.
1742        let memory_state_store = MemoryStateStore::new();
1743        let table_id = TableId::new(1);
1744        // Two columns of int32 type, the first column is PK.
1745        let schema = Schema::new(vec![
1746            Field::unnamed(DataType::Int32),
1747            Field::unnamed(DataType::Int32),
1748        ]);
1749        let column_ids = vec![0.into(), 1.into()];
1750
1751        // test insert after delete one pk, the latter insert should succeed.
1752        let chunk1 = StreamChunk::from_pretty(
1753            " i i
1754            + 1 3
1755            - 1 3
1756            + 1 6",
1757        );
1758
1759        // Prepare stream executors.
1760        let source = MockSource::with_messages(vec![
1761            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1762            Message::Chunk(chunk1),
1763            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1764        ])
1765        .into_executor(schema.clone(), StreamKey::new());
1766
1767        let order_types = vec![OrderType::ascending()];
1768        let column_descs = vec![
1769            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1770            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1771        ];
1772
1773        let table = BatchTable::for_test(
1774            memory_state_store.clone(),
1775            table_id,
1776            column_descs,
1777            order_types,
1778            vec![0],
1779            vec![0, 1],
1780        );
1781
1782        let mut materialize_executor = MaterializeExecutor::for_test(
1783            source,
1784            memory_state_store,
1785            table_id,
1786            vec![ColumnOrder::new(0, OrderType::ascending())],
1787            column_ids,
1788            Arc::new(AtomicU64::new(0)),
1789            ConflictBehavior::IgnoreConflict,
1790        )
1791        .await
1792        .boxed()
1793        .execute();
1794        let _msg1 = materialize_executor
1795            .next()
1796            .await
1797            .transpose()
1798            .unwrap()
1799            .unwrap()
1800            .as_barrier()
1801            .unwrap();
1802        let _msg2 = materialize_executor
1803            .next()
1804            .await
1805            .transpose()
1806            .unwrap()
1807            .unwrap()
1808            .as_chunk()
1809            .unwrap();
1810        let _msg3 = materialize_executor
1811            .next()
1812            .await
1813            .transpose()
1814            .unwrap()
1815            .unwrap()
1816            .as_barrier()
1817            .unwrap();
1818
1819        let row = table
1820            .get_row(
1821                &OwnedRow::new(vec![Some(1_i32.into())]),
1822                HummockReadEpoch::NoWait(u64::MAX),
1823            )
1824            .await
1825            .unwrap();
1826        assert_eq!(
1827            row,
1828            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1829        );
1830    }
1831
1832    #[tokio::test]
1833    async fn test_ignore_delete_and_update_conflict() {
1834        // Prepare storage and memtable.
1835        let memory_state_store = MemoryStateStore::new();
1836        let table_id = TableId::new(1);
1837        // Two columns of int32 type, the first column is PK.
1838        let schema = Schema::new(vec![
1839            Field::unnamed(DataType::Int32),
1840            Field::unnamed(DataType::Int32),
1841        ]);
1842        let column_ids = vec![0.into(), 1.into()];
1843
1844        // test double insert one pk, the latter should be ignored.
1845        let chunk1 = StreamChunk::from_pretty(
1846            " i i
1847            + 1 4
1848            + 2 5
1849            + 3 6
1850            U- 8 1
1851            U+ 8 2
1852            + 8 3",
1853        );
1854
1855        // test delete wrong value, delete inexistent pk
1856        let chunk2 = StreamChunk::from_pretty(
1857            " i i
1858            + 7 8
1859            - 3 4
1860            - 5 0",
1861        );
1862
1863        // test delete wrong value, delete inexistent pk
1864        let chunk3 = StreamChunk::from_pretty(
1865            " i i
1866            + 1 5
1867            U- 2 4
1868            U+ 2 8
1869            U- 9 0
1870            U+ 9 1",
1871        );
1872
1873        // Prepare stream executors.
1874        let source = MockSource::with_messages(vec![
1875            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1876            Message::Chunk(chunk1),
1877            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1878            Message::Chunk(chunk2),
1879            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1880            Message::Chunk(chunk3),
1881            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1882        ])
1883        .into_executor(schema.clone(), StreamKey::new());
1884
1885        let order_types = vec![OrderType::ascending()];
1886        let column_descs = vec![
1887            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1888            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1889        ];
1890
1891        let table = BatchTable::for_test(
1892            memory_state_store.clone(),
1893            table_id,
1894            column_descs,
1895            order_types,
1896            vec![0],
1897            vec![0, 1],
1898        );
1899
1900        let mut materialize_executor = MaterializeExecutor::for_test(
1901            source,
1902            memory_state_store,
1903            table_id,
1904            vec![ColumnOrder::new(0, OrderType::ascending())],
1905            column_ids,
1906            Arc::new(AtomicU64::new(0)),
1907            ConflictBehavior::IgnoreConflict,
1908        )
1909        .await
1910        .boxed()
1911        .execute();
1912        materialize_executor.next().await.transpose().unwrap();
1913
1914        materialize_executor.next().await.transpose().unwrap();
1915
1916        // First stream chunk. We check the existence of (3) -> (3,6)
1917        match materialize_executor.next().await.transpose().unwrap() {
1918            Some(Message::Barrier(_)) => {
1919                // can read (8, 2), check insert after update
1920                let row = table
1921                    .get_row(
1922                        &OwnedRow::new(vec![Some(8_i32.into())]),
1923                        HummockReadEpoch::NoWait(u64::MAX),
1924                    )
1925                    .await
1926                    .unwrap();
1927                assert_eq!(
1928                    row,
1929                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1930                );
1931            }
1932            _ => unreachable!(),
1933        }
1934        materialize_executor.next().await.transpose().unwrap();
1935
1936        match materialize_executor.next().await.transpose().unwrap() {
1937            Some(Message::Barrier(_)) => {
1938                let row = table
1939                    .get_row(
1940                        &OwnedRow::new(vec![Some(7_i32.into())]),
1941                        HummockReadEpoch::NoWait(u64::MAX),
1942                    )
1943                    .await
1944                    .unwrap();
1945                assert_eq!(
1946                    row,
1947                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1948                );
1949
1950                // check delete wrong value
1951                let row = table
1952                    .get_row(
1953                        &OwnedRow::new(vec![Some(3_i32.into())]),
1954                        HummockReadEpoch::NoWait(u64::MAX),
1955                    )
1956                    .await
1957                    .unwrap();
1958                assert_eq!(row, None);
1959
1960                // check delete wrong pk
1961                let row = table
1962                    .get_row(
1963                        &OwnedRow::new(vec![Some(5_i32.into())]),
1964                        HummockReadEpoch::NoWait(u64::MAX),
1965                    )
1966                    .await
1967                    .unwrap();
1968                assert_eq!(row, None);
1969            }
1970            _ => unreachable!(),
1971        }
1972
1973        materialize_executor.next().await.transpose().unwrap();
1974        // materialize_executor.next().await.transpose().unwrap();
1975        // Second stream chunk. We check the existence of (7) -> (7,8)
1976        match materialize_executor.next().await.transpose().unwrap() {
1977            Some(Message::Barrier(_)) => {
1978                let row = table
1979                    .get_row(
1980                        &OwnedRow::new(vec![Some(1_i32.into())]),
1981                        HummockReadEpoch::NoWait(u64::MAX),
1982                    )
1983                    .await
1984                    .unwrap();
1985                assert_eq!(
1986                    row,
1987                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1988                );
1989
1990                // check update wrong value
1991                let row = table
1992                    .get_row(
1993                        &OwnedRow::new(vec![Some(2_i32.into())]),
1994                        HummockReadEpoch::NoWait(u64::MAX),
1995                    )
1996                    .await
1997                    .unwrap();
1998                assert_eq!(
1999                    row,
2000                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2001                );
2002
2003                // check update wrong pk, should become insert
2004                let row = table
2005                    .get_row(
2006                        &OwnedRow::new(vec![Some(9_i32.into())]),
2007                        HummockReadEpoch::NoWait(u64::MAX),
2008                    )
2009                    .await
2010                    .unwrap();
2011                assert_eq!(
2012                    row,
2013                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2014                );
2015            }
2016            _ => unreachable!(),
2017        }
2018    }
2019
2020    #[tokio::test]
2021    async fn test_do_update_if_not_null_conflict() {
2022        // Prepare storage and memtable.
2023        let memory_state_store = MemoryStateStore::new();
2024        let table_id = TableId::new(1);
2025        // Two columns of int32 type, the first column is PK.
2026        let schema = Schema::new(vec![
2027            Field::unnamed(DataType::Int32),
2028            Field::unnamed(DataType::Int32),
2029        ]);
2030        let column_ids = vec![0.into(), 1.into()];
2031
2032        // should get (8, 2)
2033        let chunk1 = StreamChunk::from_pretty(
2034            " i i
2035            + 1 4
2036            + 2 .
2037            + 3 6
2038            U- 8 .
2039            U+ 8 2
2040            + 8 .",
2041        );
2042
2043        // should not get (3, x), should not get (5, 0)
2044        let chunk2 = StreamChunk::from_pretty(
2045            " i i
2046            + 7 8
2047            - 3 4
2048            - 5 0",
2049        );
2050
2051        // should get (2, None), (7, 8)
2052        let chunk3 = StreamChunk::from_pretty(
2053            " i i
2054            + 1 5
2055            + 7 .
2056            U- 2 4
2057            U+ 2 .
2058            U- 9 0
2059            U+ 9 1",
2060        );
2061
2062        // Prepare stream executors.
2063        let source = MockSource::with_messages(vec![
2064            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2065            Message::Chunk(chunk1),
2066            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2067            Message::Chunk(chunk2),
2068            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2069            Message::Chunk(chunk3),
2070            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2071        ])
2072        .into_executor(schema.clone(), StreamKey::new());
2073
2074        let order_types = vec![OrderType::ascending()];
2075        let column_descs = vec![
2076            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2077            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2078        ];
2079
2080        let table = BatchTable::for_test(
2081            memory_state_store.clone(),
2082            table_id,
2083            column_descs,
2084            order_types,
2085            vec![0],
2086            vec![0, 1],
2087        );
2088
2089        let mut materialize_executor = MaterializeExecutor::for_test(
2090            source,
2091            memory_state_store,
2092            table_id,
2093            vec![ColumnOrder::new(0, OrderType::ascending())],
2094            column_ids,
2095            Arc::new(AtomicU64::new(0)),
2096            ConflictBehavior::DoUpdateIfNotNull,
2097        )
2098        .await
2099        .boxed()
2100        .execute();
2101        materialize_executor.next().await.transpose().unwrap();
2102
2103        materialize_executor.next().await.transpose().unwrap();
2104
2105        // First stream chunk. We check the existence of (3) -> (3,6)
2106        match materialize_executor.next().await.transpose().unwrap() {
2107            Some(Message::Barrier(_)) => {
2108                let row = table
2109                    .get_row(
2110                        &OwnedRow::new(vec![Some(8_i32.into())]),
2111                        HummockReadEpoch::NoWait(u64::MAX),
2112                    )
2113                    .await
2114                    .unwrap();
2115                assert_eq!(
2116                    row,
2117                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2118                );
2119
2120                let row = table
2121                    .get_row(
2122                        &OwnedRow::new(vec![Some(2_i32.into())]),
2123                        HummockReadEpoch::NoWait(u64::MAX),
2124                    )
2125                    .await
2126                    .unwrap();
2127                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2128            }
2129            _ => unreachable!(),
2130        }
2131        materialize_executor.next().await.transpose().unwrap();
2132
2133        match materialize_executor.next().await.transpose().unwrap() {
2134            Some(Message::Barrier(_)) => {
2135                let row = table
2136                    .get_row(
2137                        &OwnedRow::new(vec![Some(7_i32.into())]),
2138                        HummockReadEpoch::NoWait(u64::MAX),
2139                    )
2140                    .await
2141                    .unwrap();
2142                assert_eq!(
2143                    row,
2144                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2145                );
2146
2147                // check delete wrong value
2148                let row = table
2149                    .get_row(
2150                        &OwnedRow::new(vec![Some(3_i32.into())]),
2151                        HummockReadEpoch::NoWait(u64::MAX),
2152                    )
2153                    .await
2154                    .unwrap();
2155                assert_eq!(row, None);
2156
2157                // check delete wrong pk
2158                let row = table
2159                    .get_row(
2160                        &OwnedRow::new(vec![Some(5_i32.into())]),
2161                        HummockReadEpoch::NoWait(u64::MAX),
2162                    )
2163                    .await
2164                    .unwrap();
2165                assert_eq!(row, None);
2166            }
2167            _ => unreachable!(),
2168        }
2169
2170        materialize_executor.next().await.transpose().unwrap();
2171        // materialize_executor.next().await.transpose().unwrap();
2172        // Second stream chunk. We check the existence of (7) -> (7,8)
2173        match materialize_executor.next().await.transpose().unwrap() {
2174            Some(Message::Barrier(_)) => {
2175                let row = table
2176                    .get_row(
2177                        &OwnedRow::new(vec![Some(7_i32.into())]),
2178                        HummockReadEpoch::NoWait(u64::MAX),
2179                    )
2180                    .await
2181                    .unwrap();
2182                assert_eq!(
2183                    row,
2184                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2185                );
2186
2187                // check update wrong value
2188                let row = table
2189                    .get_row(
2190                        &OwnedRow::new(vec![Some(2_i32.into())]),
2191                        HummockReadEpoch::NoWait(u64::MAX),
2192                    )
2193                    .await
2194                    .unwrap();
2195                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2196
2197                // check update wrong pk, should become insert
2198                let row = table
2199                    .get_row(
2200                        &OwnedRow::new(vec![Some(9_i32.into())]),
2201                        HummockReadEpoch::NoWait(u64::MAX),
2202                    )
2203                    .await
2204                    .unwrap();
2205                assert_eq!(
2206                    row,
2207                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2208                );
2209            }
2210            _ => unreachable!(),
2211        }
2212    }
2213
2214    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2215        const KN: u32 = 4;
2216        const SEED: u64 = 998244353;
2217        let mut ret = vec![];
2218        let mut builder =
2219            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2220        let mut rng = SmallRng::seed_from_u64(SEED);
2221
2222        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2223            let len = c.data_chunk().capacity();
2224            let mut c = StreamChunkMut::from(c);
2225            for i in 0..len {
2226                c.set_vis(i, rng.random_bool(0.5));
2227            }
2228            c.into()
2229        };
2230        for _ in 0..row_number {
2231            let k = (rng.next_u32() % KN) as i32;
2232            let v = rng.next_u32() as i32;
2233            let op = if rng.random_bool(0.5) {
2234                Op::Insert
2235            } else {
2236                Op::Delete
2237            };
2238            if let Some(c) =
2239                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2240            {
2241                ret.push(random_vis(c, &mut rng));
2242            }
2243        }
2244        if let Some(c) = builder.take() {
2245            ret.push(random_vis(c, &mut rng));
2246        }
2247        ret
2248    }
2249
2250    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2251        const N: usize = 100000;
2252
2253        // Prepare storage and memtable.
2254        let memory_state_store = MemoryStateStore::new();
2255        let table_id = TableId::new(1);
2256        // Two columns of int32 type, the first column is PK.
2257        let schema = Schema::new(vec![
2258            Field::unnamed(DataType::Int32),
2259            Field::unnamed(DataType::Int32),
2260        ]);
2261        let column_ids = vec![0.into(), 1.into()];
2262
2263        let chunks = gen_fuzz_data(N, 128);
2264        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2265            .chain(chunks.into_iter().map(Message::Chunk))
2266            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2267                test_epoch(2),
2268            ))))
2269            .collect();
2270        // Prepare stream executors.
2271        let source =
2272            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2273
2274        let mut materialize_executor = MaterializeExecutor::for_test(
2275            source,
2276            memory_state_store.clone(),
2277            table_id,
2278            vec![ColumnOrder::new(0, OrderType::ascending())],
2279            column_ids,
2280            Arc::new(AtomicU64::new(0)),
2281            conflict_behavior,
2282        )
2283        .await
2284        .boxed()
2285        .execute();
2286        materialize_executor.expect_barrier().await;
2287
2288        let order_types = vec![OrderType::ascending()];
2289        let column_descs = vec![
2290            ColumnDesc::unnamed(0.into(), DataType::Int32),
2291            ColumnDesc::unnamed(1.into(), DataType::Int32),
2292        ];
2293        let pk_indices = vec![0];
2294
2295        let mut table = StateTable::from_table_catalog(
2296            &crate::common::table::test_utils::gen_pbtable(
2297                TableId::from(1002),
2298                column_descs.clone(),
2299                order_types,
2300                pk_indices,
2301                0,
2302            ),
2303            memory_state_store.clone(),
2304            None,
2305        )
2306        .await;
2307
2308        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2309            // check with state table's memtable
2310            table.write_chunk(c);
2311        }
2312    }
2313
2314    #[tokio::test]
2315    async fn fuzz_test_stream_consistent_upsert() {
2316        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2317    }
2318
2319    #[tokio::test]
2320    async fn fuzz_test_stream_consistent_ignore() {
2321        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2322    }
2323}